Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-10-19 Thread Matthias J. Sax
What is the status of this KIP?

-Matthias

On 7/19/18 5:17 PM, Guozhang Wang wrote:
> Hello Florian,
> 
> Sorry for being late... Found myself keep apologizing for late replies
> these days. But I do want to push this KIP's progress forward as I see it
> very important and helpful feature for extensibility.
> 
> About the exceptions, I've gone through them and hopefully it is an
> exhaustive list:
> 
> 1. KTable#toStream()
> 2. KStream#merge(KStream)
> 3. KStream#process() / transform() / transformValues()
> 4. KGroupedTable / KGroupedStream#count()
> 
> 
> Here's my reasoning:
> 
> * It is okay not letting users to override the name for 1/2, since they are
> too trivial to be useful for debugging, plus their processor names would
> not determine any related topic / store names.
> * For 3, I'd vote for adding overloaded functions with Named.
> * For 4, if users really want to name the processor she can call
> aggregate() instead, so I think it is okay to skip this case.
> 
> 
> Guozhang
> 
> 
> 
> On Fri, Jul 6, 2018 at 3:06 PM, Florian Hussonnois 
> wrote:
> 
>> Hi,
>>
>> The option #3 seems to be a good alternative and I find the API more
>> elegant (thanks John).
>>
>> But, we still have the need to overload some methods either because they do
>> not accept an action instance or because they are translated to multiple
>> processors.
>>
>> For example, this is the case for methods branch() and merge(). We could
>> introduce a new interface Named (or maybe a different name ?) with a method
>> name(). All action interfaces could extend this one to implement the option
>> 3).
>> This would result by having the following overloads  :
>>
>> Stream merge(final Named name, final KStream stream);
>> KStream[] branch(final Named name, final Predicate> V>... predicates)
>>
>> N.B : The list above is  not exhaustive
>>
>> -
>> user's code will become :
>>
>> KStream stream = builder.stream("test");
>> KStream[] branches =
>> stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"),
>> Predicate.named("STREAM-PAIR-VALUE", (k, v) -> v % 2 ==
>> 0),
>> Predicate.named("STREAM-IMPAIR-VALUE", (k, v) -> v % 2 !=
>> 0));
>>
>> branches[0].to("pair");
>> branches[1].to("impair");
>> -
>>
>> This is a mix of the options 3) and 1)
>>
>> Le ven. 6 juil. 2018 à 22:58, Guozhang Wang  a écrit :
>>
>>> Hi folks, just to summarize the options we have so far:
>>>
>>> 1) Add a new "as" for KTable / KStream, plus adding new fields for
>>> operators-returns-void control objects (the current wiki's proposal).
>>>
>>> Pros: no more overloads.
>>> Cons: a bit departing with the current high-level API design of the DSL,
>>> plus, the inconsistency between operators-returns-void and
>>> operators-not-return-voids.
>>>
>>> 2) Add overloaded functions for all operators, that accepts a new control
>>> object "Described".
>>>
>>> Pros: consistent with current APIs.
>>> Cons: lots of overloaded functions to add.
>>>
>>> 3) Add another default function in the interface (thank you J8!) as John
>>> proposed.
>>>
>>> Pros: no overloaded functions, no "Described".
>>> Cons: do we lose lambda functions really (seems not if we provide a
>> "named"
>>> for each func)? Plus "Described" may be more extensible than a single
>>> `String`.
>>>
>>>
>>> My principle of considering which one is better depends primarily on "how
>>> to make advanced users easily use the additional API, while keeping it
>>> hidden from normal users who do not care at all". For that purpose I
>> think
>>> 3) > 1) > 2).
>>>
>>> One caveat though, is that changing the interface would not be
>>> binary-compatible though source-compatible, right? I.e. users need to
>>> recompile their code though no changes needed.
>>>
>>>
>>>
>>> Another note: for 3), if we really want to keep extensibility of
>> Described
>>> we could do sth. like:
>>>
>>> -
>>>
>>> public interface Predicate {
>>> // existing method
>>> boolean test(final K key, final V value);
>>>
>>> // new default method adds the ability to name the predicate
>>> default Described described() {
>>> return new Described(null);
>>> }
>>> }
>>>
>>> --
>>>
>>> where user's code becomes:
>>>
>>> stream.filter(named("key", (k, v) -> true));   // note `named` now just
>>> sets a Described("key") in "described()".
>>>
>>> stream.filter(described(Described.as("key", /* any other fancy
>> parameters
>>> in the future*/), (k, v) -> true));
>>> --
>>>
>>>
>>> I feel it is not much likely that we'd need to extend it further in the
>>> future, so just a `String` would be good enough. But just listing all
>>> possibilities here.
>>>
>>>
>>>
>>> Guozhang
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Jul 6, 2018 at 8:19 AM, John Roesler  wrote:
>>>
 Hi Florian,

 Sorry I'm late to the party, but I missed the message originally.

 Regarding the names, it's probably a good idea to stick to the same
 character

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-11-12 Thread Florian Hussonnois
Hi Matthias,

Sorry I was absent for a while. I have started a new PR for this KIP. It is
still in progress for now. I'm working on it.
https://github.com/apache/kafka/pull/5909

Le ven. 19 oct. 2018 à 20:13, Matthias J. Sax  a
écrit :

> What is the status of this KIP?
>
> -Matthias
>
> On 7/19/18 5:17 PM, Guozhang Wang wrote:
> > Hello Florian,
> >
> > Sorry for being late... Found myself keep apologizing for late replies
> > these days. But I do want to push this KIP's progress forward as I see it
> > very important and helpful feature for extensibility.
> >
> > About the exceptions, I've gone through them and hopefully it is an
> > exhaustive list:
> >
> > 1. KTable#toStream()
> > 2. KStream#merge(KStream)
> > 3. KStream#process() / transform() / transformValues()
> > 4. KGroupedTable / KGroupedStream#count()
> >
> >
> > Here's my reasoning:
> >
> > * It is okay not letting users to override the name for 1/2, since they
> are
> > too trivial to be useful for debugging, plus their processor names would
> > not determine any related topic / store names.
> > * For 3, I'd vote for adding overloaded functions with Named.
> > * For 4, if users really want to name the processor she can call
> > aggregate() instead, so I think it is okay to skip this case.
> >
> >
> > Guozhang
> >
> >
> >
> > On Fri, Jul 6, 2018 at 3:06 PM, Florian Hussonnois <
> fhussonn...@gmail.com>
> > wrote:
> >
> >> Hi,
> >>
> >> The option #3 seems to be a good alternative and I find the API more
> >> elegant (thanks John).
> >>
> >> But, we still have the need to overload some methods either because
> they do
> >> not accept an action instance or because they are translated to multiple
> >> processors.
> >>
> >> For example, this is the case for methods branch() and merge(). We could
> >> introduce a new interface Named (or maybe a different name ?) with a
> method
> >> name(). All action interfaces could extend this one to implement the
> option
> >> 3).
> >> This would result by having the following overloads  :
> >>
> >> Stream merge(final Named name, final KStream stream);
> >> KStream[] branch(final Named name, final Predicate super
> >> V>... predicates)
> >>
> >> N.B : The list above is  not exhaustive
> >>
> >> -
> >> user's code will become :
> >>
> >> KStream stream = builder.stream("test");
> >> KStream[] branches =
> >> stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"),
> >> Predicate.named("STREAM-PAIR-VALUE", (k, v) -> v % 2 ==
> >> 0),
> >> Predicate.named("STREAM-IMPAIR-VALUE", (k, v) -> v % 2
> !=
> >> 0));
> >>
> >> branches[0].to("pair");
> >> branches[1].to("impair");
> >> -
> >>
> >> This is a mix of the options 3) and 1)
> >>
> >> Le ven. 6 juil. 2018 à 22:58, Guozhang Wang  a
> écrit :
> >>
> >>> Hi folks, just to summarize the options we have so far:
> >>>
> >>> 1) Add a new "as" for KTable / KStream, plus adding new fields for
> >>> operators-returns-void control objects (the current wiki's proposal).
> >>>
> >>> Pros: no more overloads.
> >>> Cons: a bit departing with the current high-level API design of the
> DSL,
> >>> plus, the inconsistency between operators-returns-void and
> >>> operators-not-return-voids.
> >>>
> >>> 2) Add overloaded functions for all operators, that accepts a new
> control
> >>> object "Described".
> >>>
> >>> Pros: consistent with current APIs.
> >>> Cons: lots of overloaded functions to add.
> >>>
> >>> 3) Add another default function in the interface (thank you J8!) as
> John
> >>> proposed.
> >>>
> >>> Pros: no overloaded functions, no "Described".
> >>> Cons: do we lose lambda functions really (seems not if we provide a
> >> "named"
> >>> for each func)? Plus "Described" may be more extensible than a single
> >>> `String`.
> >>>
> >>>
> >>> My principle of considering which one is better depends primarily on
> "how
> >>> to make advanced users easily use the additional API, while keeping it
> >>> hidden from normal users who do not care at all". For that purpose I
> >> think
> >>> 3) > 1) > 2).
> >>>
> >>> One caveat though, is that changing the interface would not be
> >>> binary-compatible though source-compatible, right? I.e. users need to
> >>> recompile their code though no changes needed.
> >>>
> >>>
> >>>
> >>> Another note: for 3), if we really want to keep extensibility of
> >> Described
> >>> we could do sth. like:
> >>>
> >>> -
> >>>
> >>> public interface Predicate {
> >>> // existing method
> >>> boolean test(final K key, final V value);
> >>>
> >>> // new default method adds the ability to name the predicate
> >>> default Described described() {
> >>> return new Described(null);
> >>> }
> >>> }
> >>>
> >>> --
> >>>
> >>> where user's code becomes:
> >>>
> >>> stream.filter(named("key", (k, v) -> true));   // note `named` now just
> >>> sets a Described("key") in "described()".
> >>>
> >>> stream.filter(described(Described.as("key", /* any oth

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-11-16 Thread Guozhang Wang
Thanks Florian! I will take a look at the PR.



On Mon, Nov 12, 2018 at 2:44 PM Florian Hussonnois 
wrote:

> Hi Matthias,
>
> Sorry I was absent for a while. I have started a new PR for this KIP. It is
> still in progress for now. I'm working on it.
> https://github.com/apache/kafka/pull/5909
>
> Le ven. 19 oct. 2018 à 20:13, Matthias J. Sax  a
> écrit :
>
> > What is the status of this KIP?
> >
> > -Matthias
> >
> > On 7/19/18 5:17 PM, Guozhang Wang wrote:
> > > Hello Florian,
> > >
> > > Sorry for being late... Found myself keep apologizing for late replies
> > > these days. But I do want to push this KIP's progress forward as I see
> it
> > > very important and helpful feature for extensibility.
> > >
> > > About the exceptions, I've gone through them and hopefully it is an
> > > exhaustive list:
> > >
> > > 1. KTable#toStream()
> > > 2. KStream#merge(KStream)
> > > 3. KStream#process() / transform() / transformValues()
> > > 4. KGroupedTable / KGroupedStream#count()
> > >
> > >
> > > Here's my reasoning:
> > >
> > > * It is okay not letting users to override the name for 1/2, since they
> > are
> > > too trivial to be useful for debugging, plus their processor names
> would
> > > not determine any related topic / store names.
> > > * For 3, I'd vote for adding overloaded functions with Named.
> > > * For 4, if users really want to name the processor she can call
> > > aggregate() instead, so I think it is okay to skip this case.
> > >
> > >
> > > Guozhang
> > >
> > >
> > >
> > > On Fri, Jul 6, 2018 at 3:06 PM, Florian Hussonnois <
> > fhussonn...@gmail.com>
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> The option #3 seems to be a good alternative and I find the API more
> > >> elegant (thanks John).
> > >>
> > >> But, we still have the need to overload some methods either because
> > they do
> > >> not accept an action instance or because they are translated to
> multiple
> > >> processors.
> > >>
> > >> For example, this is the case for methods branch() and merge(). We
> could
> > >> introduce a new interface Named (or maybe a different name ?) with a
> > method
> > >> name(). All action interfaces could extend this one to implement the
> > option
> > >> 3).
> > >> This would result by having the following overloads  :
> > >>
> > >> Stream merge(final Named name, final KStream stream);
> > >> KStream[] branch(final Named name, final Predicate > super
> > >> V>... predicates)
> > >>
> > >> N.B : The list above is  not exhaustive
> > >>
> > >> -
> > >> user's code will become :
> > >>
> > >> KStream stream = builder.stream("test");
> > >> KStream[] branches =
> > >> stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"),
> > >> Predicate.named("STREAM-PAIR-VALUE", (k, v) -> v % 2
> ==
> > >> 0),
> > >> Predicate.named("STREAM-IMPAIR-VALUE", (k, v) -> v % 2
> > !=
> > >> 0));
> > >>
> > >> branches[0].to("pair");
> > >> branches[1].to("impair");
> > >> -
> > >>
> > >> This is a mix of the options 3) and 1)
> > >>
> > >> Le ven. 6 juil. 2018 à 22:58, Guozhang Wang  a
> > écrit :
> > >>
> > >>> Hi folks, just to summarize the options we have so far:
> > >>>
> > >>> 1) Add a new "as" for KTable / KStream, plus adding new fields for
> > >>> operators-returns-void control objects (the current wiki's proposal).
> > >>>
> > >>> Pros: no more overloads.
> > >>> Cons: a bit departing with the current high-level API design of the
> > DSL,
> > >>> plus, the inconsistency between operators-returns-void and
> > >>> operators-not-return-voids.
> > >>>
> > >>> 2) Add overloaded functions for all operators, that accepts a new
> > control
> > >>> object "Described".
> > >>>
> > >>> Pros: consistent with current APIs.
> > >>> Cons: lots of overloaded functions to add.
> > >>>
> > >>> 3) Add another default function in the interface (thank you J8!) as
> > John
> > >>> proposed.
> > >>>
> > >>> Pros: no overloaded functions, no "Described".
> > >>> Cons: do we lose lambda functions really (seems not if we provide a
> > >> "named"
> > >>> for each func)? Plus "Described" may be more extensible than a single
> > >>> `String`.
> > >>>
> > >>>
> > >>> My principle of considering which one is better depends primarily on
> > "how
> > >>> to make advanced users easily use the additional API, while keeping
> it
> > >>> hidden from normal users who do not care at all". For that purpose I
> > >> think
> > >>> 3) > 1) > 2).
> > >>>
> > >>> One caveat though, is that changing the interface would not be
> > >>> binary-compatible though source-compatible, right? I.e. users need to
> > >>> recompile their code though no changes needed.
> > >>>
> > >>>
> > >>>
> > >>> Another note: for 3), if we really want to keep extensibility of
> > >> Described
> > >>> we could do sth. like:
> > >>>
> > >>> -
> > >>>
> > >>> public interface Predicate {
> > >>> // existing method
> > >>> boolean test(final K key, final V value);
> > >>>
> > >>> // new defaul

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-11-27 Thread Guozhang Wang
Hi Florian,

I've made a pass over the PR. There are some comments that are related to
the function names which may be affecting the KIP wiki page, but overall I
think it looks good already.


Guozhang


On Fri, Nov 16, 2018 at 4:21 PM Guozhang Wang  wrote:

> Thanks Florian! I will take a look at the PR.
>
>
>
> On Mon, Nov 12, 2018 at 2:44 PM Florian Hussonnois 
> wrote:
>
>> Hi Matthias,
>>
>> Sorry I was absent for a while. I have started a new PR for this KIP. It
>> is
>> still in progress for now. I'm working on it.
>> https://github.com/apache/kafka/pull/5909
>>
>> Le ven. 19 oct. 2018 à 20:13, Matthias J. Sax  a
>> écrit :
>>
>> > What is the status of this KIP?
>> >
>> > -Matthias
>> >
>> > On 7/19/18 5:17 PM, Guozhang Wang wrote:
>> > > Hello Florian,
>> > >
>> > > Sorry for being late... Found myself keep apologizing for late replies
>> > > these days. But I do want to push this KIP's progress forward as I
>> see it
>> > > very important and helpful feature for extensibility.
>> > >
>> > > About the exceptions, I've gone through them and hopefully it is an
>> > > exhaustive list:
>> > >
>> > > 1. KTable#toStream()
>> > > 2. KStream#merge(KStream)
>> > > 3. KStream#process() / transform() / transformValues()
>> > > 4. KGroupedTable / KGroupedStream#count()
>> > >
>> > >
>> > > Here's my reasoning:
>> > >
>> > > * It is okay not letting users to override the name for 1/2, since
>> they
>> > are
>> > > too trivial to be useful for debugging, plus their processor names
>> would
>> > > not determine any related topic / store names.
>> > > * For 3, I'd vote for adding overloaded functions with Named.
>> > > * For 4, if users really want to name the processor she can call
>> > > aggregate() instead, so I think it is okay to skip this case.
>> > >
>> > >
>> > > Guozhang
>> > >
>> > >
>> > >
>> > > On Fri, Jul 6, 2018 at 3:06 PM, Florian Hussonnois <
>> > fhussonn...@gmail.com>
>> > > wrote:
>> > >
>> > >> Hi,
>> > >>
>> > >> The option #3 seems to be a good alternative and I find the API more
>> > >> elegant (thanks John).
>> > >>
>> > >> But, we still have the need to overload some methods either because
>> > they do
>> > >> not accept an action instance or because they are translated to
>> multiple
>> > >> processors.
>> > >>
>> > >> For example, this is the case for methods branch() and merge(). We
>> could
>> > >> introduce a new interface Named (or maybe a different name ?) with a
>> > method
>> > >> name(). All action interfaces could extend this one to implement the
>> > option
>> > >> 3).
>> > >> This would result by having the following overloads  :
>> > >>
>> > >> Stream merge(final Named name, final KStream stream);
>> > >> KStream[] branch(final Named name, final Predicate> > super
>> > >> V>... predicates)
>> > >>
>> > >> N.B : The list above is  not exhaustive
>> > >>
>> > >> -
>> > >> user's code will become :
>> > >>
>> > >> KStream stream = builder.stream("test");
>> > >> KStream[] branches =
>> > >> stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"),
>> > >> Predicate.named("STREAM-PAIR-VALUE", (k, v) -> v % 2
>> ==
>> > >> 0),
>> > >> Predicate.named("STREAM-IMPAIR-VALUE", (k, v) -> v %
>> 2
>> > !=
>> > >> 0));
>> > >>
>> > >> branches[0].to("pair");
>> > >> branches[1].to("impair");
>> > >> -
>> > >>
>> > >> This is a mix of the options 3) and 1)
>> > >>
>> > >> Le ven. 6 juil. 2018 à 22:58, Guozhang Wang  a
>> > écrit :
>> > >>
>> > >>> Hi folks, just to summarize the options we have so far:
>> > >>>
>> > >>> 1) Add a new "as" for KTable / KStream, plus adding new fields for
>> > >>> operators-returns-void control objects (the current wiki's
>> proposal).
>> > >>>
>> > >>> Pros: no more overloads.
>> > >>> Cons: a bit departing with the current high-level API design of the
>> > DSL,
>> > >>> plus, the inconsistency between operators-returns-void and
>> > >>> operators-not-return-voids.
>> > >>>
>> > >>> 2) Add overloaded functions for all operators, that accepts a new
>> > control
>> > >>> object "Described".
>> > >>>
>> > >>> Pros: consistent with current APIs.
>> > >>> Cons: lots of overloaded functions to add.
>> > >>>
>> > >>> 3) Add another default function in the interface (thank you J8!) as
>> > John
>> > >>> proposed.
>> > >>>
>> > >>> Pros: no overloaded functions, no "Described".
>> > >>> Cons: do we lose lambda functions really (seems not if we provide a
>> > >> "named"
>> > >>> for each func)? Plus "Described" may be more extensible than a
>> single
>> > >>> `String`.
>> > >>>
>> > >>>
>> > >>> My principle of considering which one is better depends primarily on
>> > "how
>> > >>> to make advanced users easily use the additional API, while keeping
>> it
>> > >>> hidden from normal users who do not care at all". For that purpose I
>> > >> think
>> > >>> 3) > 1) > 2).
>> > >>>
>> > >>> One caveat though, is that changing the interface would not be
>> > >>> binary-compatible though source-compa

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-12-10 Thread John Roesler
Hi Florian,

I hope it's ok if I ask a few questions at this late stage...

Comment 1 ==

It seems like the proposal is to add a new "Named" interface that is
intended to be mixed in with the existing API objects at various points.

Just to preface some of my comments, it looks like your KIP was created
quite a while ago, so the API may have changed somewhat since you started.

As I see the API, there are a few different kinds of DSL method arguments:
* functions: things like Initializer, Aggregator, ValueJoiner,
ForEachAction... All of these are essentially Streams-flavored Function
interfaces with different arities, type bounds, and semantics.
* config objects: things like Produced, Consumed, Joined, Grouped... These
are containers for configurations, where the target of the configuration is
the operation itself
* raw configurations: things like a raw topic-name string and Materialized:
These are configurations for operations that have no config object, and for
various reasons, we didn't make one. The distinguishing feature is that the
target of the configuration is not the operation itself, but some aspect of
it. For example, in Materialized, we are not setting the caching behavior
of, for example, an aggregation; we're setting the caching behavior of a
materialized state store attached to the aggregation.

It seems like choosing to mix the Named interface in with the functions has
a couple of unfortunate side-effects:
* Aggregator is not the only function passed to any of the relevant
aggregate methods, so it seems a little arbitrary to pick that function
over Initializer or Merger.
* As you noted, branch() takes an array of Predicate, so we just ignore the
provided name(s), even though Predicate names are used elsewhere.
* Not all things that we want to name have function arguments, notably
source and sink, so we'd switch paradigms and use the config object instead.
* Adding an extra method to the function interfaces means that those are no
longer SAM interfaces. You proposed to add a default implementation, so we
could still pass a lambda if we don't want to set the name, but if we *do*
want to set the name, we can no longer use lambdas.

I think the obvious other choice would be to mix Named in with the config
objects instead, but this has one main downside of its own...
* not every operator we wish to name has a config object. I don't know if
everyone involved is comfortable with adding a config object to every
operator that's missing one.

Personally, I favor moving toward a more consistent state that's forward
compatible with any further changes we wish to make. I *think* that giving
every operator two forms (one with no config and one with a config object)
would be such an API.

Comment 2 =

Finally, just a minor comment: the static method in Named wouldn't work
properly as defined. Assuming that we mix Named in with Produced, for
example, we'd need to be able to use it like:
>  kStream.to("out", Produced.with("myOut"))
This doesn't work because with() returns a Named, but we need a Produced.

We can pull off a builder method in the interface, but not a static method.
To define a builder method in the interface that returns an instance of the
concrete subtype, you have to use the "curiously recurring generic" pattern.

It would look like:

public interface Named> {
  String name();
  N withName(String name);
}

You can see where the name of the pattern comes from ;)
An implementation would then look like:

public class Produced implements Named {
  String name() { return name; }
  Produced withName(final String name) { this.name = name; return this; }
}

Note that the generic parameter gets filled in properly in the implementing
class, so that you get the right return type out.

It doesn't work at all with a static factory method at the interface level,
so it would be up to Produced to define a static factory if it wants to
present one.

==

Those are my two feedbacks!

I hope you find this helpful, rather than frustrating. I'm sorry I didn't
get a chance to comment sooner.

Thanks for the KIP, I think it will be much nicer to be able to name the
processor nodes.

-John

On Tue, Nov 27, 2018 at 6:34 PM Guozhang Wang  wrote:

> Hi Florian,
>
> I've made a pass over the PR. There are some comments that are related to
> the function names which may be affecting the KIP wiki page, but overall I
> think it looks good already.
>
>
> Guozhang
>
>
> On Fri, Nov 16, 2018 at 4:21 PM Guozhang Wang  wrote:
>
> > Thanks Florian! I will take a look at the PR.
> >
> >
> >
> > On Mon, Nov 12, 2018 at 2:44 PM Florian Hussonnois <
> fhussonn...@gmail.com>
> > wrote:
> >
> >> Hi Matthias,
> >>
> >> Sorry I was absent for a while. I have started a new PR for this KIP. It
> >> is
> >> still in progress for now. I'm working on it.
> >> https://github.com/apache/kafka/pull/5909
> >>
> >> Le ven. 19 oct. 2018 à 20:13, Matthias J. Sax  a
> >> écrit :
> >>
> >> > What is the status of this KIP?
> >> >
> >

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-12-10 Thread Guozhang Wang
I had one meta comment on the PR:
https://github.com/apache/kafka/pull/5909#discussion_r240447153

On Mon, Dec 10, 2018 at 5:22 PM John Roesler  wrote:

> Hi Florian,
>
> I hope it's ok if I ask a few questions at this late stage...
>
> Comment 1 ==
>
> It seems like the proposal is to add a new "Named" interface that is
> intended to be mixed in with the existing API objects at various points.
>
> Just to preface some of my comments, it looks like your KIP was created
> quite a while ago, so the API may have changed somewhat since you started.
>
> As I see the API, there are a few different kinds of DSL method arguments:
> * functions: things like Initializer, Aggregator, ValueJoiner,
> ForEachAction... All of these are essentially Streams-flavored Function
> interfaces with different arities, type bounds, and semantics.
> * config objects: things like Produced, Consumed, Joined, Grouped... These
> are containers for configurations, where the target of the configuration is
> the operation itself
> * raw configurations: things like a raw topic-name string and Materialized:
> These are configurations for operations that have no config object, and for
> various reasons, we didn't make one. The distinguishing feature is that the
> target of the configuration is not the operation itself, but some aspect of
> it. For example, in Materialized, we are not setting the caching behavior
> of, for example, an aggregation; we're setting the caching behavior of a
> materialized state store attached to the aggregation.
>
> It seems like choosing to mix the Named interface in with the functions has
> a couple of unfortunate side-effects:
> * Aggregator is not the only function passed to any of the relevant
> aggregate methods, so it seems a little arbitrary to pick that function
> over Initializer or Merger.
> * As you noted, branch() takes an array of Predicate, so we just ignore the
> provided name(s), even though Predicate names are used elsewhere.
> * Not all things that we want to name have function arguments, notably
> source and sink, so we'd switch paradigms and use the config object
> instead.
> * Adding an extra method to the function interfaces means that those are no
> longer SAM interfaces. You proposed to add a default implementation, so we
> could still pass a lambda if we don't want to set the name, but if we *do*
> want to set the name, we can no longer use lambdas.
>
> I think the obvious other choice would be to mix Named in with the config
> objects instead, but this has one main downside of its own...
> * not every operator we wish to name has a config object. I don't know if
> everyone involved is comfortable with adding a config object to every
> operator that's missing one.
>
> Personally, I favor moving toward a more consistent state that's forward
> compatible with any further changes we wish to make. I *think* that giving
> every operator two forms (one with no config and one with a config object)
> would be such an API.
>
> Comment 2 =
>
> Finally, just a minor comment: the static method in Named wouldn't work
> properly as defined. Assuming that we mix Named in with Produced, for
> example, we'd need to be able to use it like:
> >  kStream.to("out", Produced.with("myOut"))
> This doesn't work because with() returns a Named, but we need a Produced.
>
> We can pull off a builder method in the interface, but not a static method.
> To define a builder method in the interface that returns an instance of the
> concrete subtype, you have to use the "curiously recurring generic"
> pattern.
>
> It would look like:
>
> public interface Named> {
>   String name();
>   N withName(String name);
> }
>
> You can see where the name of the pattern comes from ;)
> An implementation would then look like:
>
> public class Produced implements Named {
>   String name() { return name; }
>   Produced withName(final String name) { this.name = name; return this; }
> }
>
> Note that the generic parameter gets filled in properly in the implementing
> class, so that you get the right return type out.
>
> It doesn't work at all with a static factory method at the interface level,
> so it would be up to Produced to define a static factory if it wants to
> present one.
>
> ==
>
> Those are my two feedbacks!
>
> I hope you find this helpful, rather than frustrating. I'm sorry I didn't
> get a chance to comment sooner.
>
> Thanks for the KIP, I think it will be much nicer to be able to name the
> processor nodes.
>
> -John
>
> On Tue, Nov 27, 2018 at 6:34 PM Guozhang Wang  wrote:
>
> > Hi Florian,
> >
> > I've made a pass over the PR. There are some comments that are related to
> > the function names which may be affecting the KIP wiki page, but overall
> I
> > think it looks good already.
> >
> >
> > Guozhang
> >
> >
> > On Fri, Nov 16, 2018 at 4:21 PM Guozhang Wang 
> wrote:
> >
> > > Thanks Florian! I will take a look at the PR.
> > >
> > >
> > >
> > > On Mon, Nov 12, 2018 at 2:44 PM Florian Hussonnois <
> 

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-12-11 Thread Florian Hussonnois
Thank you very much for your feedbacks.

Currently, there is still lot of discussions regarding the Named interface.
On the one hand we should provided consistency over the stream API and on
the other hand we should not break the semantic as John point it up.

Guozhang, I'm sorry, but I'm little bit confused, maybe I missed something.
In your comment you have suggested that :
* Produced/Consumed/Suppressed should extends Named
* Named should have a private-package method to get the specified processor
name internally (processorName())
* Finally we should end up with something like :  Named -> XXX ->
XXXInternal or Named -> Produced -> ProducedInternal

The objective behind that is to :
* consolidate the internal method processorName()
* consolidate the method withName that exists now existing into Produced,
Consumed and Suppressed.

But, Named is an interface so we can't define a private-package method on
it. Also, for example Produced and ProducedInternal are not in the same
package so having a private-package method doesn't really help.
In addition, if we add the withName method into Named interface this can
become confusing for developers because action interfaces (ValueMapper,
Reducer, etc) extend it.
The interface would look like :

public interface Named> {
default String name() {
return null;
}
default Named withName(final String name) {
return null;
}
...
}

So maybe instead of adding another method to Named we could create a new
package-private class that could be extended by
Produced/Consumed/Joined/Suppressed. For exemple,
class SettableName> implements Named {

protected String processorName;

SettableName(final SettableName settable) {
this(Objects.requireNonNull(settable, "settable can't be
null").name());
}

SettableName(final String processorName) {
this.processorName = processorName;
}

@Override
public String name() {
return processorName;
}
public T withName(final String processorName) {
this.processorName = processorName;
return (T)this;
}
}

In that way, we will get : public class Produced implements
SettableName { ...

WDYT?


Le mar. 11 déc. 2018 à 02:46, Guozhang Wang  a écrit :

> I had one meta comment on the PR:
> https://github.com/apache/kafka/pull/5909#discussion_r240447153
>
> On Mon, Dec 10, 2018 at 5:22 PM John Roesler  wrote:
>
> > Hi Florian,
> >
> > I hope it's ok if I ask a few questions at this late stage...
> >
> > Comment 1 ==
> >
> > It seems like the proposal is to add a new "Named" interface that is
> > intended to be mixed in with the existing API objects at various points.
> >
> > Just to preface some of my comments, it looks like your KIP was created
> > quite a while ago, so the API may have changed somewhat since you
> started.
> >
> > As I see the API, there are a few different kinds of DSL method
> arguments:
> > * functions: things like Initializer, Aggregator, ValueJoiner,
> > ForEachAction... All of these are essentially Streams-flavored Function
> > interfaces with different arities, type bounds, and semantics.
> > * config objects: things like Produced, Consumed, Joined, Grouped...
> These
> > are containers for configurations, where the target of the configuration
> is
> > the operation itself
> > * raw configurations: things like a raw topic-name string and
> Materialized:
> > These are configurations for operations that have no config object, and
> for
> > various reasons, we didn't make one. The distinguishing feature is that
> the
> > target of the configuration is not the operation itself, but some aspect
> of
> > it. For example, in Materialized, we are not setting the caching behavior
> > of, for example, an aggregation; we're setting the caching behavior of a
> > materialized state store attached to the aggregation.
> >
> > It seems like choosing to mix the Named interface in with the functions
> has
> > a couple of unfortunate side-effects:
> > * Aggregator is not the only function passed to any of the relevant
> > aggregate methods, so it seems a little arbitrary to pick that function
> > over Initializer or Merger.
> > * As you noted, branch() takes an array of Predicate, so we just ignore
> the
> > provided name(s), even though Predicate names are used elsewhere.
> > * Not all things that we want to name have function arguments, notably
> > source and sink, so we'd switch paradigms and use the config object
> > instead.
> > * Adding an extra method to the function interfaces means that those are
> no
> > longer SAM interfaces. You proposed to add a default implementation, so
> we
> > could still pass a lambda if we don't want to set the name, but if we
> *do*
> > want to set the name, we can no longer use lambdas.
> >
> > I think the obvious other choice would be to mix Named in with the config
> > objects instead, but this has one main downside of its own...
> > * not every operator we wish to name has a config object. I don

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-12-13 Thread Matthias J. Sax
Just catching up on this discussion.

My overall personal take is, that I am not a big fan of the interface
`Named` that is used as a factory. I would rather prefer to add a
control object parameter to all methods that don't have one yet. This
KIP was started a while ago, and we added new naming capabilities in the
meantime. Guozhang's example in the PR comment about naming in
stream-stream join shows, that we might end up in a confusion situation
for users if we use `Named`. Also, in 2.1, user can already name as
repartition-/changelog-topics and stores. Thus, KIP-307 boils down to
provide non-functional naming?

Hence, for all methods that allow to specify names already, I don't see
any reason to change them, but use the existing API to also name the
processor(s) instead of allowing uses to specify a new name.

About the inconsistency in method naming. I agree, that `as` is very
generic and maybe not the best choice.

I think it might be helpful, to have a table overview in the KIP, that
list all existing static/non-static methods that allow to specify a
name, plus a columns with the new suggested naming for those methods?

Thoughts?


-Matthias


On 12/12/18 12:45 AM, Florian Hussonnois wrote:
> Thank you very much for your feedbacks.
> 
> Currently, there is still lot of discussions regarding the Named interface.
> On the one hand we should provided consistency over the stream API and on
> the other hand we should not break the semantic as John point it up.
> 
> Guozhang, I'm sorry, but I'm little bit confused, maybe I missed something.
> In your comment you have suggested that :
> * Produced/Consumed/Suppressed should extends Named
> * Named should have a private-package method to get the specified processor
> name internally (processorName())
> * Finally we should end up with something like :  Named -> XXX ->
> XXXInternal or Named -> Produced -> ProducedInternal
> 
> The objective behind that is to :
> * consolidate the internal method processorName()
> * consolidate the method withName that exists now existing into Produced,
> Consumed and Suppressed.
> 
> But, Named is an interface so we can't define a private-package method on
> it. Also, for example Produced and ProducedInternal are not in the same
> package so having a private-package method doesn't really help.
> In addition, if we add the withName method into Named interface this can
> become confusing for developers because action interfaces (ValueMapper,
> Reducer, etc) extend it.
> The interface would look like :
> 
> public interface Named> {
> default String name() {
> return null;
> }
> default Named withName(final String name) {
> return null;
> }
> ...
> }
> 
> So maybe instead of adding another method to Named we could create a new
> package-private class that could be extended by
> Produced/Consumed/Joined/Suppressed. For exemple,
> class SettableName> implements Named {
> 
> protected String processorName;
> 
> SettableName(final SettableName settable) {
> this(Objects.requireNonNull(settable, "settable can't be
> null").name());
> }
> 
> SettableName(final String processorName) {
> this.processorName = processorName;
> }
> 
> @Override
> public String name() {
> return processorName;
> }
> public T withName(final String processorName) {
> this.processorName = processorName;
> return (T)this;
> }
> }
> 
> In that way, we will get : public class Produced implements
> SettableName { ...
> 
> WDYT?
> 
> 
> Le mar. 11 déc. 2018 à 02:46, Guozhang Wang  a écrit :
> 
>> I had one meta comment on the PR:
>> https://github.com/apache/kafka/pull/5909#discussion_r240447153
>>
>> On Mon, Dec 10, 2018 at 5:22 PM John Roesler  wrote:
>>
>>> Hi Florian,
>>>
>>> I hope it's ok if I ask a few questions at this late stage...
>>>
>>> Comment 1 ==
>>>
>>> It seems like the proposal is to add a new "Named" interface that is
>>> intended to be mixed in with the existing API objects at various points.
>>>
>>> Just to preface some of my comments, it looks like your KIP was created
>>> quite a while ago, so the API may have changed somewhat since you
>> started.
>>>
>>> As I see the API, there are a few different kinds of DSL method
>> arguments:
>>> * functions: things like Initializer, Aggregator, ValueJoiner,
>>> ForEachAction... All of these are essentially Streams-flavored Function
>>> interfaces with different arities, type bounds, and semantics.
>>> * config objects: things like Produced, Consumed, Joined, Grouped...
>> These
>>> are containers for configurations, where the target of the configuration
>> is
>>> the operation itself
>>> * raw configurations: things like a raw topic-name string and
>> Materialized:
>>> These are configurations for operations that have no config object, and
>> for
>>> various reasons, we didn't make one. The distinguishing feature is that
>> the
>>> target of the configuration is not the operation itself,

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-12-13 Thread John Roesler
Hi again, all,

Matthias, I agree with you.

Florian, thanks for your response.

I think your proposal is the best way to address the ask for hiding the
name() getter. But I'd like to question that ask and instead propose that
we just make the name() getter part of the public API.

The desire to "hide" the getters causes a lot of complexity in our code
base, and it will become completely impractical with the mixin strategy of
Named.

If we were to switch strategies back to mixing Named in to the control
objects rather than the functions, then the path forward becomes quite
clear.

On the other hand, it seems harmless for anyone who wants to be able to
query the name from a control object after setting it, so my vote would be
simply to keep the Named interface as:

public interface Named> {
  String name();
  T withName(String name);
}

Under this proposal, we only mix Named in to the control objects, which
means we have no need of default implementations anymore (because we can
update all the control objects concurrently with adding this interface to
them).

This does hinge on switching over to a control-object-only strategy, which
introduces the need to add about 50 new control object classes, which would
only serve to implement Named. As a middle ground, maybe we could just add
one generic control object class, like:

public class NamedOperation implements Named {
  private final String name;
  private NamedOperation(final String name) { this.name = name; }
  public static NamedOperation name(final String name) {
return new NamedOperation(name);
  }
  public String name() { return name; }
  public NamedOperation withName(final String name) {
return new NamedOperation(name);
  }
}

And then, we'd add overloads for all the methods that don't have control
objects already (for example, filter() ):

// existing
KStream filter(Predicate predicate);

// new
KStream filter(Predicate predicate,
NamedOperation named);

Additionally, in regard to Matthias's point about existing control objects
with naming semantics, they would extend Named (but not NamedOperation) for
uniformity.

You provided a good approach to hide the getter with your SettableName
class; I think what you proposed is the only way we could hide the name.
In the end, though, it's a lot of complexity added (control object class
hierarchy, inheritance, mutable state, internal casting) for something of
dubious value: to be able to hide the name from someone *after they
themselves have set it*.

Although it'll be a pain, perhaps Matthias's suggestion to enumerate all
the API methods is the best way to be sure we all agree on what's going to
happen.

Thanks again for wrangling with this issue,
-John

On Thu, Dec 13, 2018 at 9:03 AM Matthias J. Sax 
wrote:

> Just catching up on this discussion.
>
> My overall personal take is, that I am not a big fan of the interface
> `Named` that is used as a factory. I would rather prefer to add a
> control object parameter to all methods that don't have one yet. This
> KIP was started a while ago, and we added new naming capabilities in the
> meantime. Guozhang's example in the PR comment about naming in
> stream-stream join shows, that we might end up in a confusion situation
> for users if we use `Named`. Also, in 2.1, user can already name as
> repartition-/changelog-topics and stores. Thus, KIP-307 boils down to
> provide non-functional naming?
>
> Hence, for all methods that allow to specify names already, I don't see
> any reason to change them, but use the existing API to also name the
> processor(s) instead of allowing uses to specify a new name.
>
> About the inconsistency in method naming. I agree, that `as` is very
> generic and maybe not the best choice.
>
> I think it might be helpful, to have a table overview in the KIP, that
> list all existing static/non-static methods that allow to specify a
> name, plus a columns with the new suggested naming for those methods?
>
> Thoughts?
>
>
> -Matthias
>
>
> On 12/12/18 12:45 AM, Florian Hussonnois wrote:
> > Thank you very much for your feedbacks.
> >
> > Currently, there is still lot of discussions regarding the Named
> interface.
> > On the one hand we should provided consistency over the stream API and on
> > the other hand we should not break the semantic as John point it up.
> >
> > Guozhang, I'm sorry, but I'm little bit confused, maybe I missed
> something.
> > In your comment you have suggested that :
> > * Produced/Consumed/Suppressed should extends Named
> > * Named should have a private-package method to get the specified
> processor
> > name internally (processorName())
> > * Finally we should end up with something like :  Named -> XXX ->
> > XXXInternal or Named -> Produced -> ProducedInternal
> >
> > The objective behind that is to :
> > * consolidate the internal method processorName()
> > * consolidate the method withName that exists now existing into Produced,
> > Consumed and Suppressed.
> >
> > But, Named is an interface so we can't

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-12-13 Thread Florian Hussonnois
Hi all,

Thanks again. I agree with your propositions.
Also IMHO, overloading all methods (filter, map) to accept a new control
object seems to provide a more natural development experience for users.

Actually, this was the first proposition for this KIP, but we have rejected
it because this solution led to adding a lot of new methods.
As you mentioned it, the API has evolve since the creation of this KIP -
some existing control objects already allow to customize internal names. We
should so keep on that strategy.

If everyone is OK with that, I will update the KIP and the PR accordingly;

Thanks.

Le jeu. 13 déc. 2018 à 18:08, John Roesler  a écrit :

> Hi again, all,
>
> Matthias, I agree with you.
>
> Florian, thanks for your response.
>
> I think your proposal is the best way to address the ask for hiding the
> name() getter. But I'd like to question that ask and instead propose that
> we just make the name() getter part of the public API.
>
> The desire to "hide" the getters causes a lot of complexity in our code
> base, and it will become completely impractical with the mixin strategy of
> Named.
>
> If we were to switch strategies back to mixing Named in to the control
> objects rather than the functions, then the path forward becomes quite
> clear.
>
> On the other hand, it seems harmless for anyone who wants to be able to
> query the name from a control object after setting it, so my vote would be
> simply to keep the Named interface as:
>
> public interface Named> {
>   String name();
>   T withName(String name);
> }
>
> Under this proposal, we only mix Named in to the control objects, which
> means we have no need of default implementations anymore (because we can
> update all the control objects concurrently with adding this interface to
> them).
>
> This does hinge on switching over to a control-object-only strategy, which
> introduces the need to add about 50 new control object classes, which would
> only serve to implement Named. As a middle ground, maybe we could just add
> one generic control object class, like:
>
> public class NamedOperation implements Named {
>   private final String name;
>   private NamedOperation(final String name) { this.name = name; }
>   public static NamedOperation name(final String name) {
> return new NamedOperation(name);
>   }
>   public String name() { return name; }
>   public NamedOperation withName(final String name) {
> return new NamedOperation(name);
>   }
> }
>
> And then, we'd add overloads for all the methods that don't have control
> objects already (for example, filter() ):
>
> // existing
> KStream filter(Predicate predicate);
>
> // new
> KStream filter(Predicate predicate,
> NamedOperation named);
>
> Additionally, in regard to Matthias's point about existing control objects
> with naming semantics, they would extend Named (but not NamedOperation) for
> uniformity.
>
> You provided a good approach to hide the getter with your SettableName
> class; I think what you proposed is the only way we could hide the name.
> In the end, though, it's a lot of complexity added (control object class
> hierarchy, inheritance, mutable state, internal casting) for something of
> dubious value: to be able to hide the name from someone *after they
> themselves have set it*.
>
> Although it'll be a pain, perhaps Matthias's suggestion to enumerate all
> the API methods is the best way to be sure we all agree on what's going to
> happen.
>
> Thanks again for wrangling with this issue,
> -John
>
> On Thu, Dec 13, 2018 at 9:03 AM Matthias J. Sax 
> wrote:
>
> > Just catching up on this discussion.
> >
> > My overall personal take is, that I am not a big fan of the interface
> > `Named` that is used as a factory. I would rather prefer to add a
> > control object parameter to all methods that don't have one yet. This
> > KIP was started a while ago, and we added new naming capabilities in the
> > meantime. Guozhang's example in the PR comment about naming in
> > stream-stream join shows, that we might end up in a confusion situation
> > for users if we use `Named`. Also, in 2.1, user can already name as
> > repartition-/changelog-topics and stores. Thus, KIP-307 boils down to
> > provide non-functional naming?
> >
> > Hence, for all methods that allow to specify names already, I don't see
> > any reason to change them, but use the existing API to also name the
> > processor(s) instead of allowing uses to specify a new name.
> >
> > About the inconsistency in method naming. I agree, that `as` is very
> > generic and maybe not the best choice.
> >
> > I think it might be helpful, to have a table overview in the KIP, that
> > list all existing static/non-static methods that allow to specify a
> > name, plus a columns with the new suggested naming for those methods?
> >
> > Thoughts?
> >
> >
> > -Matthias
> >
> >
> > On 12/12/18 12:45 AM, Florian Hussonnois wrote:
> > > Thank you very much for your feedbacks.
> > >
> > > Currently, there is still lot of discussions

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-12-14 Thread John Roesler
Hi Florian,

Sorry about the run-around of rejecting the original proposal,
only to return to it later on. Hopefully, it's more encouraging
than frustrating that we're coming around to your initial way of
thinking.

Thanks!
-John

On Thu, Dec 13, 2018 at 4:28 PM Florian Hussonnois 
wrote:

> Hi all,
>
> Thanks again. I agree with your propositions.
> Also IMHO, overloading all methods (filter, map) to accept a new control
> object seems to provide a more natural development experience for users.
>
> Actually, this was the first proposition for this KIP, but we have rejected
> it because this solution led to adding a lot of new methods.
> As you mentioned it, the API has evolve since the creation of this KIP -
> some existing control objects already allow to customize internal names. We
> should so keep on that strategy.
>
> If everyone is OK with that, I will update the KIP and the PR accordingly;
>
> Thanks.
>
> Le jeu. 13 déc. 2018 à 18:08, John Roesler  a écrit :
>
> > Hi again, all,
> >
> > Matthias, I agree with you.
> >
> > Florian, thanks for your response.
> >
> > I think your proposal is the best way to address the ask for hiding the
> > name() getter. But I'd like to question that ask and instead propose that
> > we just make the name() getter part of the public API.
> >
> > The desire to "hide" the getters causes a lot of complexity in our code
> > base, and it will become completely impractical with the mixin strategy
> of
> > Named.
> >
> > If we were to switch strategies back to mixing Named in to the control
> > objects rather than the functions, then the path forward becomes quite
> > clear.
> >
> > On the other hand, it seems harmless for anyone who wants to be able to
> > query the name from a control object after setting it, so my vote would
> be
> > simply to keep the Named interface as:
> >
> > public interface Named> {
> >   String name();
> >   T withName(String name);
> > }
> >
> > Under this proposal, we only mix Named in to the control objects, which
> > means we have no need of default implementations anymore (because we can
> > update all the control objects concurrently with adding this interface to
> > them).
> >
> > This does hinge on switching over to a control-object-only strategy,
> which
> > introduces the need to add about 50 new control object classes, which
> would
> > only serve to implement Named. As a middle ground, maybe we could just
> add
> > one generic control object class, like:
> >
> > public class NamedOperation implements Named {
> >   private final String name;
> >   private NamedOperation(final String name) { this.name = name; }
> >   public static NamedOperation name(final String name) {
> > return new NamedOperation(name);
> >   }
> >   public String name() { return name; }
> >   public NamedOperation withName(final String name) {
> > return new NamedOperation(name);
> >   }
> > }
> >
> > And then, we'd add overloads for all the methods that don't have control
> > objects already (for example, filter() ):
> >
> > // existing
> > KStream filter(Predicate predicate);
> >
> > // new
> > KStream filter(Predicate predicate,
> > NamedOperation named);
> >
> > Additionally, in regard to Matthias's point about existing control
> objects
> > with naming semantics, they would extend Named (but not NamedOperation)
> for
> > uniformity.
> >
> > You provided a good approach to hide the getter with your SettableName
> > class; I think what you proposed is the only way we could hide the name.
> > In the end, though, it's a lot of complexity added (control object class
> > hierarchy, inheritance, mutable state, internal casting) for something of
> > dubious value: to be able to hide the name from someone *after they
> > themselves have set it*.
> >
> > Although it'll be a pain, perhaps Matthias's suggestion to enumerate all
> > the API methods is the best way to be sure we all agree on what's going
> to
> > happen.
> >
> > Thanks again for wrangling with this issue,
> > -John
> >
> > On Thu, Dec 13, 2018 at 9:03 AM Matthias J. Sax 
> > wrote:
> >
> > > Just catching up on this discussion.
> > >
> > > My overall personal take is, that I am not a big fan of the interface
> > > `Named` that is used as a factory. I would rather prefer to add a
> > > control object parameter to all methods that don't have one yet. This
> > > KIP was started a while ago, and we added new naming capabilities in
> the
> > > meantime. Guozhang's example in the PR comment about naming in
> > > stream-stream join shows, that we might end up in a confusion situation
> > > for users if we use `Named`. Also, in 2.1, user can already name as
> > > repartition-/changelog-topics and stores. Thus, KIP-307 boils down to
> > > provide non-functional naming?
> > >
> > > Hence, for all methods that allow to specify names already, I don't see
> > > any reason to change them, but use the existing API to also name the
> > > processor(s) instead of allowing uses to specify a new name.
> > >
> > > About 

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-12-14 Thread Guozhang Wang
Hello Florian,

Really appreciate you for your patience.

I know that we've discussed about the approach to adding overloaded
functions and rejected it early on. But looking deeper into the current PR
I realized that this approach has a danger of great API confusions to users
(I tried to explain my thoughts in the PR, but it was not very clear) ---
the basic idea is that, today we already have a few existing control
classes including Grouped, Joined, Suppressed that allow users to specify
serdes etc, while also a "name" which can then be used to define the
processor name / internal topic names in the topology (the static function
names are not consistent, which I think we should fix as well). And Named
interface, by extending the lambda function interfaces like ValueJoiner /
Predicate etc opens the door for another way to specify the names again.

So in order to achieve consistency, we are left with generally two options:

1) only allow users to specify names via the lambda interfaces that extends
Named interface. This means we'd better remove the naming mechanism from
the existing control objects to keep consistency.

2) only allow users to specify names via control classes, and we introduce
a new class (Named) for those which do not have one yet --- this leads to
the overloaded functions.

I did a quick count on the num.of overloaded functions, and summing from
KTable (8) / KStream (15) / KGroupedStream (6) / KGroupedTable (6) /
TimeWindowedKStream (6) / SessionWindowedKStream (6) we got about 47
overloaded functions (our guess was pretty close!) -- note this is based on
John's proposal that we can let existing Grouped / Joined to extend Named
and hence we only need overloaded functions with a default NamedOperation
for those operators that do not have a control classes already.

Thinking about this approach I feel it is not too bad compared with either
1) above, which would require us to deprecate lot of public functions
around name(), or having a mixed mechanism for naming, which could lead to
very confusing behavior to users. Additionally, for most users who would
only want to specify the names for those stateful operations which have
internal topics / state stores and hence are more keen to upgrade
compatibility, those added overloads would be not-often used functions for
them anyways. And by letting existing control classes to extend Named, we
can have a unified method name for static constructor as well.



Guozhang


On Fri, Dec 14, 2018 at 10:24 AM John Roesler  wrote:

> Hi Florian,
>
> Sorry about the run-around of rejecting the original proposal,
> only to return to it later on. Hopefully, it's more encouraging
> than frustrating that we're coming around to your initial way of
> thinking.
>
> Thanks!
> -John
>
> On Thu, Dec 13, 2018 at 4:28 PM Florian Hussonnois 
> wrote:
>
> > Hi all,
> >
> > Thanks again. I agree with your propositions.
> > Also IMHO, overloading all methods (filter, map) to accept a new control
> > object seems to provide a more natural development experience for users.
> >
> > Actually, this was the first proposition for this KIP, but we have
> rejected
> > it because this solution led to adding a lot of new methods.
> > As you mentioned it, the API has evolve since the creation of this KIP -
> > some existing control objects already allow to customize internal names.
> We
> > should so keep on that strategy.
> >
> > If everyone is OK with that, I will update the KIP and the PR
> accordingly;
> >
> > Thanks.
> >
> > Le jeu. 13 déc. 2018 à 18:08, John Roesler  a écrit :
> >
> > > Hi again, all,
> > >
> > > Matthias, I agree with you.
> > >
> > > Florian, thanks for your response.
> > >
> > > I think your proposal is the best way to address the ask for hiding the
> > > name() getter. But I'd like to question that ask and instead propose
> that
> > > we just make the name() getter part of the public API.
> > >
> > > The desire to "hide" the getters causes a lot of complexity in our code
> > > base, and it will become completely impractical with the mixin strategy
> > of
> > > Named.
> > >
> > > If we were to switch strategies back to mixing Named in to the control
> > > objects rather than the functions, then the path forward becomes quite
> > > clear.
> > >
> > > On the other hand, it seems harmless for anyone who wants to be able to
> > > query the name from a control object after setting it, so my vote would
> > be
> > > simply to keep the Named interface as:
> > >
> > > public interface Named> {
> > >   String name();
> > >   T withName(String name);
> > > }
> > >
> > > Under this proposal, we only mix Named in to the control objects, which
> > > means we have no need of default implementations anymore (because we
> can
> > > update all the control objects concurrently with adding this interface
> to
> > > them).
> > >
> > > This does hinge on switching over to a control-object-only strategy,
> > which
> > > introduces the need to add about 50 new control object classes, 

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-12-17 Thread Guozhang Wang
Hi Florian / John,

Just wanted to throw a couple minor thoughts on the current proposal:

1) Regarding the interface / function name, I'd propose we call the
interface `NamedOperation` which would be implemented by Produced /
Consumed / Printed / Joined / Grouped / Suppressed (note I intentionally
exclude Materialized here since its semantics is quite), and have the
default class that implements `NamedOperation` as `Named`, which would be
used in our adding overload functions. The main reason is to have
consistency in naming.

2) As a minor tweak, I think it's better to use Joined.name() in both its
possibly generate repartition topic, as well as the map processor used for
group-by (currently this name is only used for the repartition topic).


Florian: if you think this proposal makes sense, please feel free to go
ahead and update the PR; after we made a first pass on it and feels
confident about it, we can go ahead with the VOTING process. About the
implementation of 2) above, this may be out of your implementation scope,
so feel free to leave it out side your PR while Bill who originally worked
on the Grouped KIP can make a follow-up PR for it.

Guozhang

On Fri, Dec 14, 2018 at 9:43 PM Guozhang Wang  wrote:

> Hello Florian,
>
> Really appreciate you for your patience.
>
> I know that we've discussed about the approach to adding overloaded
> functions and rejected it early on. But looking deeper into the current PR
> I realized that this approach has a danger of great API confusions to users
> (I tried to explain my thoughts in the PR, but it was not very clear) ---
> the basic idea is that, today we already have a few existing control
> classes including Grouped, Joined, Suppressed that allow users to specify
> serdes etc, while also a "name" which can then be used to define the
> processor name / internal topic names in the topology (the static function
> names are not consistent, which I think we should fix as well). And Named
> interface, by extending the lambda function interfaces like ValueJoiner /
> Predicate etc opens the door for another way to specify the names again.
>
> So in order to achieve consistency, we are left with generally two options:
>
> 1) only allow users to specify names via the lambda interfaces that
> extends Named interface. This means we'd better remove the naming mechanism
> from the existing control objects to keep consistency.
>
> 2) only allow users to specify names via control classes, and we introduce
> a new class (Named) for those which do not have one yet --- this leads to
> the overloaded functions.
>
> I did a quick count on the num.of overloaded functions, and summing from
> KTable (8) / KStream (15) / KGroupedStream (6) / KGroupedTable (6) /
> TimeWindowedKStream (6) / SessionWindowedKStream (6) we got about 47
> overloaded functions (our guess was pretty close!) -- note this is based on
> John's proposal that we can let existing Grouped / Joined to extend Named
> and hence we only need overloaded functions with a default NamedOperation
> for those operators that do not have a control classes already.
>
> Thinking about this approach I feel it is not too bad compared with either
> 1) above, which would require us to deprecate lot of public functions
> around name(), or having a mixed mechanism for naming, which could lead to
> very confusing behavior to users. Additionally, for most users who would
> only want to specify the names for those stateful operations which have
> internal topics / state stores and hence are more keen to upgrade
> compatibility, those added overloads would be not-often used functions for
> them anyways. And by letting existing control classes to extend Named, we
> can have a unified method name for static constructor as well.
>
>
>
> Guozhang
>
>
> On Fri, Dec 14, 2018 at 10:24 AM John Roesler  wrote:
>
>> Hi Florian,
>>
>> Sorry about the run-around of rejecting the original proposal,
>> only to return to it later on. Hopefully, it's more encouraging
>> than frustrating that we're coming around to your initial way of
>> thinking.
>>
>> Thanks!
>> -John
>>
>> On Thu, Dec 13, 2018 at 4:28 PM Florian Hussonnois > >
>> wrote:
>>
>> > Hi all,
>> >
>> > Thanks again. I agree with your propositions.
>> > Also IMHO, overloading all methods (filter, map) to accept a new control
>> > object seems to provide a more natural development experience for users.
>> >
>> > Actually, this was the first proposition for this KIP, but we have
>> rejected
>> > it because this solution led to adding a lot of new methods.
>> > As you mentioned it, the API has evolve since the creation of this KIP -
>> > some existing control objects already allow to customize internal
>> names. We
>> > should so keep on that strategy.
>> >
>> > If everyone is OK with that, I will update the KIP and the PR
>> accordingly;
>> >
>> > Thanks.
>> >
>> > Le jeu. 13 déc. 2018 à 18:08, John Roesler  a écrit
>> :
>> >
>> > > Hi again, all,
>> > >
>> > > Matthias, I agree with you.
>

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-09 Thread Guozhang Wang
Hello Florian,

Just checking if have read about my previous email and if you feel happy
about it. We have the 2.2 KIP freeze deadline at 24th this month, while the
PR itself is getting quite close. So it'll be great if we can get the
agreement on it and get it into 2.2.0 release.


Guozhang


On Mon, Dec 17, 2018 at 2:39 PM Guozhang Wang  wrote:

> Hi Florian / John,
>
> Just wanted to throw a couple minor thoughts on the current proposal:
>
> 1) Regarding the interface / function name, I'd propose we call the
> interface `NamedOperation` which would be implemented by Produced /
> Consumed / Printed / Joined / Grouped / Suppressed (note I intentionally
> exclude Materialized here since its semantics is quite), and have the
> default class that implements `NamedOperation` as `Named`, which would be
> used in our adding overload functions. The main reason is to have
> consistency in naming.
>
> 2) As a minor tweak, I think it's better to use Joined.name() in both its
> possibly generate repartition topic, as well as the map processor used for
> group-by (currently this name is only used for the repartition topic).
>
>
> Florian: if you think this proposal makes sense, please feel free to go
> ahead and update the PR; after we made a first pass on it and feels
> confident about it, we can go ahead with the VOTING process. About the
> implementation of 2) above, this may be out of your implementation scope,
> so feel free to leave it out side your PR while Bill who originally worked
> on the Grouped KIP can make a follow-up PR for it.
>
> Guozhang
>
> On Fri, Dec 14, 2018 at 9:43 PM Guozhang Wang  wrote:
>
>> Hello Florian,
>>
>> Really appreciate you for your patience.
>>
>> I know that we've discussed about the approach to adding overloaded
>> functions and rejected it early on. But looking deeper into the current PR
>> I realized that this approach has a danger of great API confusions to users
>> (I tried to explain my thoughts in the PR, but it was not very clear) ---
>> the basic idea is that, today we already have a few existing control
>> classes including Grouped, Joined, Suppressed that allow users to specify
>> serdes etc, while also a "name" which can then be used to define the
>> processor name / internal topic names in the topology (the static function
>> names are not consistent, which I think we should fix as well). And Named
>> interface, by extending the lambda function interfaces like ValueJoiner /
>> Predicate etc opens the door for another way to specify the names again.
>>
>> So in order to achieve consistency, we are left with generally two
>> options:
>>
>> 1) only allow users to specify names via the lambda interfaces that
>> extends Named interface. This means we'd better remove the naming mechanism
>> from the existing control objects to keep consistency.
>>
>> 2) only allow users to specify names via control classes, and we
>> introduce a new class (Named) for those which do not have one yet --- this
>> leads to the overloaded functions.
>>
>> I did a quick count on the num.of overloaded functions, and summing from
>> KTable (8) / KStream (15) / KGroupedStream (6) / KGroupedTable (6) /
>> TimeWindowedKStream (6) / SessionWindowedKStream (6) we got about 47
>> overloaded functions (our guess was pretty close!) -- note this is based on
>> John's proposal that we can let existing Grouped / Joined to extend Named
>> and hence we only need overloaded functions with a default NamedOperation
>> for those operators that do not have a control classes already.
>>
>> Thinking about this approach I feel it is not too bad compared with
>> either 1) above, which would require us to deprecate lot of public
>> functions around name(), or having a mixed mechanism for naming, which
>> could lead to very confusing behavior to users. Additionally, for most
>> users who would only want to specify the names for those stateful
>> operations which have internal topics / state stores and hence are more
>> keen to upgrade compatibility, those added overloads would be not-often
>> used functions for them anyways. And by letting existing control classes to
>> extend Named, we can have a unified method name for static constructor as
>> well.
>>
>>
>>
>> Guozhang
>>
>>
>> On Fri, Dec 14, 2018 at 10:24 AM John Roesler  wrote:
>>
>>> Hi Florian,
>>>
>>> Sorry about the run-around of rejecting the original proposal,
>>> only to return to it later on. Hopefully, it's more encouraging
>>> than frustrating that we're coming around to your initial way of
>>> thinking.
>>>
>>> Thanks!
>>> -John
>>>
>>> On Thu, Dec 13, 2018 at 4:28 PM Florian Hussonnois <
>>> fhussonn...@gmail.com>
>>> wrote:
>>>
>>> > Hi all,
>>> >
>>> > Thanks again. I agree with your propositions.
>>> > Also IMHO, overloading all methods (filter, map) to accept a new
>>> control
>>> > object seems to provide a more natural development experience for
>>> users.
>>> >
>>> > Actually, this was the first proposition for this KIP, but we have
>>> rejected

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-11 Thread Florian Hussonnois
Hi Guozhang,

I have updated the PR as well as the KIP. I should add more unit tests to
covers all new methods.

However, I still have one test in failure. The reason is that using
Joined.name() in both potential repartition topic and processor nodes leads
to topology-incompatible.
How should we deal with that ?

Thanks,

Le jeu. 10 janv. 2019 à 01:21, Guozhang Wang  a écrit :

> Hello Florian,
>
> Just checking if have read about my previous email and if you feel happy
> about it. We have the 2.2 KIP freeze deadline at 24th this month, while the
> PR itself is getting quite close. So it'll be great if we can get the
> agreement on it and get it into 2.2.0 release.
>
>
> Guozhang
>
>
> On Mon, Dec 17, 2018 at 2:39 PM Guozhang Wang  wrote:
>
>> Hi Florian / John,
>>
>> Just wanted to throw a couple minor thoughts on the current proposal:
>>
>> 1) Regarding the interface / function name, I'd propose we call the
>> interface `NamedOperation` which would be implemented by Produced /
>> Consumed / Printed / Joined / Grouped / Suppressed (note I intentionally
>> exclude Materialized here since its semantics is quite), and have the
>> default class that implements `NamedOperation` as `Named`, which would be
>> used in our adding overload functions. The main reason is to have
>> consistency in naming.
>>
>> 2) As a minor tweak, I think it's better to use Joined.name() in both its
>> possibly generate repartition topic, as well as the map processor used for
>> group-by (currently this name is only used for the repartition topic).
>>
>>
>> Florian: if you think this proposal makes sense, please feel free to go
>> ahead and update the PR; after we made a first pass on it and feels
>> confident about it, we can go ahead with the VOTING process. About the
>> implementation of 2) above, this may be out of your implementation scope,
>> so feel free to leave it out side your PR while Bill who originally worked
>> on the Grouped KIP can make a follow-up PR for it.
>>
>> Guozhang
>>
>> On Fri, Dec 14, 2018 at 9:43 PM Guozhang Wang  wrote:
>>
>>> Hello Florian,
>>>
>>> Really appreciate you for your patience.
>>>
>>> I know that we've discussed about the approach to adding overloaded
>>> functions and rejected it early on. But looking deeper into the current PR
>>> I realized that this approach has a danger of great API confusions to users
>>> (I tried to explain my thoughts in the PR, but it was not very clear) ---
>>> the basic idea is that, today we already have a few existing control
>>> classes including Grouped, Joined, Suppressed that allow users to specify
>>> serdes etc, while also a "name" which can then be used to define the
>>> processor name / internal topic names in the topology (the static function
>>> names are not consistent, which I think we should fix as well). And Named
>>> interface, by extending the lambda function interfaces like ValueJoiner /
>>> Predicate etc opens the door for another way to specify the names again.
>>>
>>> So in order to achieve consistency, we are left with generally two
>>> options:
>>>
>>> 1) only allow users to specify names via the lambda interfaces that
>>> extends Named interface. This means we'd better remove the naming mechanism
>>> from the existing control objects to keep consistency.
>>>
>>> 2) only allow users to specify names via control classes, and we
>>> introduce a new class (Named) for those which do not have one yet --- this
>>> leads to the overloaded functions.
>>>
>>> I did a quick count on the num.of overloaded functions, and summing from
>>> KTable (8) / KStream (15) / KGroupedStream (6) / KGroupedTable (6) /
>>> TimeWindowedKStream (6) / SessionWindowedKStream (6) we got about 47
>>> overloaded functions (our guess was pretty close!) -- note this is based on
>>> John's proposal that we can let existing Grouped / Joined to extend Named
>>> and hence we only need overloaded functions with a default NamedOperation
>>> for those operators that do not have a control classes already.
>>>
>>> Thinking about this approach I feel it is not too bad compared with
>>> either 1) above, which would require us to deprecate lot of public
>>> functions around name(), or having a mixed mechanism for naming, which
>>> could lead to very confusing behavior to users. Additionally, for most
>>> users who would only want to specify the names for those stateful
>>> operations which have internal topics / state stores and hence are more
>>> keen to upgrade compatibility, those added overloads would be not-often
>>> used functions for them anyways. And by letting existing control classes to
>>> extend Named, we can have a unified method name for static constructor as
>>> well.
>>>
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Fri, Dec 14, 2018 at 10:24 AM John Roesler  wrote:
>>>
 Hi Florian,

 Sorry about the run-around of rejecting the original proposal,
 only to return to it later on. Hopefully, it's more encouraging
 than frustrating that we're coming around to your i

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-12 Thread Matthias J. Sax
Just catching up on this KIP again.

One nit. The KIP says:

> In addition, the generated names have a few disadvantages to guarantee 
> topology compatibilities. In fact, adding a new operator, using a 
> third-library doing some optimization to remove some operators or upgrading 
> to a new KafkaStreams version with internal API changes may changed suffix 
> indexing for a large amount of the processor names. This will in turn change 
> the internal state store names, as well as internal topic names as well.
> 

This is not true any longer (I guess it was true, when the KIP was
initially proposed), because all stores/internal-topics can be named
since 2.1 release. I would suggest to remove the paragraph.

Overall, I like the Named/NamedOperation design.

What is unclear to me thought is, why we need new overloads for methods
that accept `Materialized`. To be more precise, I think it make sense to
add an overload that only takes `Named`, but not one that takes both
`Named` and `Materialized`. For example:

KGroupedStream#count() // exists
KGroupedStream#count(Materialized) // exits
KGroupedStream#count(Named) // added (makes sense to me)
KGroupedStream#count(Named, Materialized) // added -- why?

I would prefer to use `Materialized` to name the processor for this
case, too. Can you elaborate on the motivation?


-Matthias

On 1/11/19 3:39 PM, Florian Hussonnois wrote:
> Hi Guozhang,
> 
> I have updated the PR as well as the KIP. I should add more unit tests to
> covers all new methods.
> 
> However, I still have one test in failure. The reason is that using
> Joined.name() in both potential repartition topic and processor nodes leads
> to topology-incompatible.
> How should we deal with that ?
> 
> Thanks,
> 
> Le jeu. 10 janv. 2019 à 01:21, Guozhang Wang  a écrit :
> 
>> Hello Florian,
>>
>> Just checking if have read about my previous email and if you feel happy
>> about it. We have the 2.2 KIP freeze deadline at 24th this month, while the
>> PR itself is getting quite close. So it'll be great if we can get the
>> agreement on it and get it into 2.2.0 release.
>>
>>
>> Guozhang
>>
>>
>> On Mon, Dec 17, 2018 at 2:39 PM Guozhang Wang  wrote:
>>
>>> Hi Florian / John,
>>>
>>> Just wanted to throw a couple minor thoughts on the current proposal:
>>>
>>> 1) Regarding the interface / function name, I'd propose we call the
>>> interface `NamedOperation` which would be implemented by Produced /
>>> Consumed / Printed / Joined / Grouped / Suppressed (note I intentionally
>>> exclude Materialized here since its semantics is quite), and have the
>>> default class that implements `NamedOperation` as `Named`, which would be
>>> used in our adding overload functions. The main reason is to have
>>> consistency in naming.
>>>
>>> 2) As a minor tweak, I think it's better to use Joined.name() in both its
>>> possibly generate repartition topic, as well as the map processor used for
>>> group-by (currently this name is only used for the repartition topic).
>>>
>>>
>>> Florian: if you think this proposal makes sense, please feel free to go
>>> ahead and update the PR; after we made a first pass on it and feels
>>> confident about it, we can go ahead with the VOTING process. About the
>>> implementation of 2) above, this may be out of your implementation scope,
>>> so feel free to leave it out side your PR while Bill who originally worked
>>> on the Grouped KIP can make a follow-up PR for it.
>>>
>>> Guozhang
>>>
>>> On Fri, Dec 14, 2018 at 9:43 PM Guozhang Wang  wrote:
>>>
 Hello Florian,

 Really appreciate you for your patience.

 I know that we've discussed about the approach to adding overloaded
 functions and rejected it early on. But looking deeper into the current PR
 I realized that this approach has a danger of great API confusions to users
 (I tried to explain my thoughts in the PR, but it was not very clear) ---
 the basic idea is that, today we already have a few existing control
 classes including Grouped, Joined, Suppressed that allow users to specify
 serdes etc, while also a "name" which can then be used to define the
 processor name / internal topic names in the topology (the static function
 names are not consistent, which I think we should fix as well). And Named
 interface, by extending the lambda function interfaces like ValueJoiner /
 Predicate etc opens the door for another way to specify the names again.

 So in order to achieve consistency, we are left with generally two
 options:

 1) only allow users to specify names via the lambda interfaces that
 extends Named interface. This means we'd better remove the naming mechanism
 from the existing control objects to keep consistency.

 2) only allow users to specify names via control classes, and we
 introduce a new class (Named) for those which do not have one yet --- this
 leads to the overloaded functions.

 I did a quick count on the num.

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-13 Thread Florian Hussonnois
Hi Matthias,

The reason for overloading the methods with Materialized parameter is
regarding the semantic of this class.
The Materialized class allow to name a queryable store. if a name is set
then it will be used both to name the state-store and  the changelog-topic.
If no name is given, then the provided Named will be used.
This allow to name the operation without having a queriable store.

So if my analysis is correct, we will end up with :

  Generated  | Named   | Joined /  Grouped
|  Materialized
-
Node |   X   | X   |X
 |
-
Repartition Topic   |   X   |  |X
 |
-
Queryable Store||  |
  | X
-
State store |   X  | X  | X
   | X
-
Changelog Topic|  X   |  X|  X
 | X
-

Le dim. 13 janv. 2019 à 03:23, Matthias J. Sax  a
écrit :

> Just catching up on this KIP again.
>
> One nit. The KIP says:
>
> > In addition, the generated names have a few disadvantages to guarantee
> topology compatibilities. In fact, adding a new operator, using a
> third-library doing some optimization to remove some operators or upgrading
> to a new KafkaStreams version with internal API changes may changed suffix
> indexing for a large amount of the processor names. This will in turn
> change the internal state store names, as well as internal topic names as
> well.
> >
>
> This is not true any longer (I guess it was true, when the KIP was
> initially proposed), because all stores/internal-topics can be named
> since 2.1 release. I would suggest to remove the paragraph.
>
> Overall, I like the Named/NamedOperation design.
>
> What is unclear to me thought is, why we need new overloads for methods
> that accept `Materialized`. To be more precise, I think it make sense to
> add an overload that only takes `Named`, but not one that takes both
> `Named` and `Materialized`. For example:
>
> KGroupedStream#count() // exists
> KGroupedStream#count(Materialized) // exits
> KGroupedStream#count(Named) // added (makes sense to me)
> KGroupedStream#count(Named, Materialized) // added -- why?
>
> I would prefer to use `Materialized` to name the processor for this
> case, too. Can you elaborate on the motivation?
>
>
> -Matthias
>
> On 1/11/19 3:39 PM, Florian Hussonnois wrote:
> > Hi Guozhang,
> >
> > I have updated the PR as well as the KIP. I should add more unit tests to
> > covers all new methods.
> >
> > However, I still have one test in failure. The reason is that using
> > Joined.name() in both potential repartition topic and processor nodes
> leads
> > to topology-incompatible.
> > How should we deal with that ?
> >
> > Thanks,
> >
> > Le jeu. 10 janv. 2019 à 01:21, Guozhang Wang  a
> écrit :
> >
> >> Hello Florian,
> >>
> >> Just checking if have read about my previous email and if you feel happy
> >> about it. We have the 2.2 KIP freeze deadline at 24th this month, while
> the
> >> PR itself is getting quite close. So it'll be great if we can get the
> >> agreement on it and get it into 2.2.0 release.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Dec 17, 2018 at 2:39 PM Guozhang Wang 
> wrote:
> >>
> >>> Hi Florian / John,
> >>>
> >>> Just wanted to throw a couple minor thoughts on the current proposal:
> >>>
> >>> 1) Regarding the interface / function name, I'd propose we call the
> >>> interface `NamedOperation` which would be implemented by Produced /
> >>> Consumed / Printed / Joined / Grouped / Suppressed (note I
> intentionally
> >>> exclude Materialized here since its semantics is quite), and have the
> >>> default class that implements `NamedOperation` as `Named`, which would
> be
> >>> used in our adding overload functions. The main reason is to have
> >>> consistency in naming.
> >>>
> >>> 2) As a minor tweak, I think it's better to use Joined.name() in both
> its
> >>> possibly generate repartition topic, as well as the map processor used
> for
> >>> group-by (currently this name is only used for the repartition topic).
> >>>
> >>>
> >>> Florian: if you think this proposal makes sense, please feel free to go
> >>> ahead and update the PR; after we made a first pass on it and feels
> >>> confident about it, we can go ahead with the VOTING process. About the
> >>> im

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-15 Thread Matthias J. Sax
While I understand that it should be possible to specify store name and
processor name independent from each other, it's still unclear to me,
why we cannot use the `Materialized` parameter to specify the processor
name:

> // only set the node name
> #count(Named.as("processorName"));
> 
> // only set the store name
> #count(Materialized.as("storeName"));
> 
> // set both
> #count(Materialized.as("storeName").withName("processorName"));

This this case, it might be good to rename `withName` to
`withProcessorName` to avoid confusion with the store name.

However, why do we need this:

> #count(Materialized.as("storeName"), Named.as("processorName"));

I would prefer to not add this overload.



Strictly, we could also avoid `#count(Named)`, and set the processor
name only via:

> #count(Materialized.as(null).withName("processorName"));

I admit, it's a little clumsy, but would save us one more overload.



One more comment that I forgot last time: why do we add the getter
`Named#name()`? All other configuration classes only define setters and
we add getters only in the internal implementation.


-Matthias

On 1/13/19 4:22 AM, Florian Hussonnois wrote:
> Hi Matthias,
> 
> The reason for overloading the methods with Materialized parameter is
> regarding the semantic of this class.
> The Materialized class allow to name a queryable store. if a name is set
> then it will be used both to name the state-store and  the changelog-topic.
> If no name is given, then the provided Named will be used.
> This allow to name the operation without having a queriable store.
> 
> So if my analysis is correct, we will end up with :
> 
>   Generated  | Named   | Joined /  Grouped
> |  Materialized
> -
> Node |   X   | X   |X
>  |
> -
> Repartition Topic   |   X   |  |X
>  |
> -
> Queryable Store||  |
>   | X
> -
> State store |   X  | X  | X
>| X
> -
> Changelog Topic|  X   |  X|  X
>  | X
> -
> 
> Le dim. 13 janv. 2019 à 03:23, Matthias J. Sax  a
> écrit :
> 
>> Just catching up on this KIP again.
>>
>> One nit. The KIP says:
>>
>>> In addition, the generated names have a few disadvantages to guarantee
>> topology compatibilities. In fact, adding a new operator, using a
>> third-library doing some optimization to remove some operators or upgrading
>> to a new KafkaStreams version with internal API changes may changed suffix
>> indexing for a large amount of the processor names. This will in turn
>> change the internal state store names, as well as internal topic names as
>> well.
>>>
>>
>> This is not true any longer (I guess it was true, when the KIP was
>> initially proposed), because all stores/internal-topics can be named
>> since 2.1 release. I would suggest to remove the paragraph.
>>
>> Overall, I like the Named/NamedOperation design.
>>
>> What is unclear to me thought is, why we need new overloads for methods
>> that accept `Materialized`. To be more precise, I think it make sense to
>> add an overload that only takes `Named`, but not one that takes both
>> `Named` and `Materialized`. For example:
>>
>> KGroupedStream#count() // exists
>> KGroupedStream#count(Materialized) // exits
>> KGroupedStream#count(Named) // added (makes sense to me)
>> KGroupedStream#count(Named, Materialized) // added -- why?
>>
>> I would prefer to use `Materialized` to name the processor for this
>> case, too. Can you elaborate on the motivation?
>>
>>
>> -Matthias
>>
>> On 1/11/19 3:39 PM, Florian Hussonnois wrote:
>>> Hi Guozhang,
>>>
>>> I have updated the PR as well as the KIP. I should add more unit tests to
>>> covers all new methods.
>>>
>>> However, I still have one test in failure. The reason is that using
>>> Joined.name() in both potential repartition topic and processor nodes
>> leads
>>> to topology-incompatible.
>>> How should we deal with that ?
>>>
>>> Thanks,
>>>
>>> Le jeu. 10 janv. 2019 à 01:21, Guozhang Wang  a
>> écrit :
>>>
 Hello Florian,

 Just checking if have read about my previous email and if you feel happy
 about it. We have the 2.2 KIP freeze deadline at 24th this month, while
>> the
 PR itself is getting quite close. So it'll be gr

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-16 Thread John Roesler
Hi Matthias,

One thing that we discussed earlier was to avoid creating ambiguity by
conflating config objects that configure an operation (like Grouped) with
config objects that configure an aspect of the operation (like
Materialized).

It is natural for the Grouped config to extend Named, as doing so indicates
that grouping operations can be named (I.e., the name applies to the
operation itself, which in turn makes it reasonable to use the operation's
name as a component in the related processors' and topics' names).

But what would it mean for Materialized to extend Named? Materialized only
configures the materialization of an operation's result, not the operation
itself, so I guess it would mean the name applies to the result of the
operation? It doesn't really work.

Adding config objects to the DSL was an attempt to avoid overload bloat as
more aspects of operations need to be configured.
However, we made a mistake with Materialized, since (as noted) it doesn't
configure the operation itself, but just one aspect of it.
We basically bagged a bunch of parameters into one, without solving the
problem structurally, and this is the result:
As soon as we need to configure a *different* aspect of the operation, we
again need to add a new overload, and the cycle begins again.

The proper solution here is to add an eponymous config object to each
stateful operation, one which mixes in or composes the Materialized aspect
config and the Named aspect config. But this is a large API change, and we
decided on the middle ground of just adding Named as an optional parameter
via new overloads for now.

A similar compromise was to go ahead and add a Named overload directly to
all the operators that currently have no config object.
Again, the proper thing would be to add a new config class for each
individual operation, but it seemed like a drastic change.
We basically said that right now, we don't think we'll ever need to
configure another aspect of those operators than the name, and we're
acknowledging that if we do, we'll have to created a small mess to clean up.
It's really just a generalization of the same problem with Materialized
operations.

To answer your question about the Named interface:
The primary reason is that Named is an aspect that is meant to be mixed in
with other config objects.
For example, Grouped can extend Named.
If we followed the pattern you've referenced, we would have a public
interface Named with only the setter and a private class NamedInternal with
the setter and getter.
But would Grouped be a subclass of NamedInternal?
Then, we could only have one kind of aspect mixin, since Java doesn't have
multiple class inheritance, or we'd have to decide if the next thing should
be a superclass of Named or a subclass of Named and a superclass of Grouped.
Plus, in the implementation, instead of just casting Grouped to
GroupedInternal (which is already unclean design), we'd also be casting
Grouped to NamedInternal, which is super confusing.

It's far cleaner all around just to use the type system "the right way",
which is what we've proposed.
Any config class can mix in the Named aspect, and it inherits a contract to
supply both the setter and the getter.
Our implementation can actually avoid any casting in this usage, since we
can just call grouped.name() to get the name, instead of something like
((NamedInternal) grouped).name().

Plus, what harm does it do to let people get back the configuration
property that they *just set* on the config object?
It doesn't break encapsulation.
It would certainly make writing tests a lot easier for everyone.

All around, I would advocate for moving toward this design for all the
config interfaces, as I've previously demonstrated how we've made an
intractable mess out of the window config hierarchy by trying to be clever
and hiding the getters.

I hope this helps,
-John


On Wed, Jan 16, 2019 at 12:59 AM Matthias J. Sax 
wrote:

> While I understand that it should be possible to specify store name and
> processor name independent from each other, it's still unclear to me,
> why we cannot use the `Materialized` parameter to specify the processor
> name:
>
> > // only set the node name
> > #count(Named.as("processorName"));
> >
> > // only set the store name
> > #count(Materialized.as("storeName"));
> >
> > // set both
> > #count(Materialized.as("storeName").withName("processorName"));
>
> This this case, it might be good to rename `withName` to
> `withProcessorName` to avoid confusion with the store name.
>
> However, why do we need this:
>
> > #count(Materialized.as("storeName"), Named.as("processorName"));
>
> I would prefer to not add this overload.
>
>
>
> Strictly, we could also avoid `#count(Named)`, and set the processor
> name only via:
>
> > #count(Materialized.as(null).withName("processorName"));
>
> I admit, it's a little clumsy, but would save us one more overload.
>
>
>
> One more comment that I forgot last time: why do we add the getter
> `Named#name(

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-16 Thread Matthias J. Sax
Thanks for the details John.

While I understand your argument that it is no optimal to use
`Materialized` to set the processor name, I still slightly prefer this
option, because adding more overloads seems to be even worse to me.

But I would also not block this KIP if the majority of people prefer to
add overloads instead of extending `Materialized`.


However, I cannot follow your argument about `NamedOperation#name()`
getter method. So far, all configuration classes don't have getters and
it seems to be inconsistent to add a single one now. We also don't need
any cast IMHO, as we would use the same construct as we do for all other
config classed via `NamedInternal` to access the name:

> final String name = new NamedInternal(named).name();

Maybe, it would have been better to add getters from the beginning on
(even if I think it was the right decision to not add getters). However,
this ship have sailed and if we want to add getters to avoid the
`XxxInternal()` construct, we should do it for all classes -- however,
what would a user gain if we do this? It would just be a lot of "noise"
IMHO.


@Florian: I would suggest to start a VOTE if you want to get this into
2.2 release. The open questions seem to be minor and I think we can
resolve them in parallel to the vote.



-Matthias


On 1/16/19 12:59 PM, John Roesler wrote:
> Hi Matthias,
> 
> One thing that we discussed earlier was to avoid creating ambiguity by
> conflating config objects that configure an operation (like Grouped) with
> config objects that configure an aspect of the operation (like
> Materialized).
> 
> It is natural for the Grouped config to extend Named, as doing so indicates
> that grouping operations can be named (I.e., the name applies to the
> operation itself, which in turn makes it reasonable to use the operation's
> name as a component in the related processors' and topics' names).
> 
> But what would it mean for Materialized to extend Named? Materialized only
> configures the materialization of an operation's result, not the operation
> itself, so I guess it would mean the name applies to the result of the
> operation? It doesn't really work.
> 
> Adding config objects to the DSL was an attempt to avoid overload bloat as
> more aspects of operations need to be configured.
> However, we made a mistake with Materialized, since (as noted) it doesn't
> configure the operation itself, but just one aspect of it.
> We basically bagged a bunch of parameters into one, without solving the
> problem structurally, and this is the result:
> As soon as we need to configure a *different* aspect of the operation, we
> again need to add a new overload, and the cycle begins again.
> 
> The proper solution here is to add an eponymous config object to each
> stateful operation, one which mixes in or composes the Materialized aspect
> config and the Named aspect config. But this is a large API change, and we
> decided on the middle ground of just adding Named as an optional parameter
> via new overloads for now.
> 
> A similar compromise was to go ahead and add a Named overload directly to
> all the operators that currently have no config object.
> Again, the proper thing would be to add a new config class for each
> individual operation, but it seemed like a drastic change.
> We basically said that right now, we don't think we'll ever need to
> configure another aspect of those operators than the name, and we're
> acknowledging that if we do, we'll have to created a small mess to clean up.
> It's really just a generalization of the same problem with Materialized
> operations.
> 
> To answer your question about the Named interface:
> The primary reason is that Named is an aspect that is meant to be mixed in
> with other config objects.
> For example, Grouped can extend Named.
> If we followed the pattern you've referenced, we would have a public
> interface Named with only the setter and a private class NamedInternal with
> the setter and getter.
> But would Grouped be a subclass of NamedInternal?
> Then, we could only have one kind of aspect mixin, since Java doesn't have
> multiple class inheritance, or we'd have to decide if the next thing should
> be a superclass of Named or a subclass of Named and a superclass of Grouped.
> Plus, in the implementation, instead of just casting Grouped to
> GroupedInternal (which is already unclean design), we'd also be casting
> Grouped to NamedInternal, which is super confusing.
> 
> It's far cleaner all around just to use the type system "the right way",
> which is what we've proposed.
> Any config class can mix in the Named aspect, and it inherits a contract to
> supply both the setter and the getter.
> Our implementation can actually avoid any casting in this usage, since we
> can just call grouped.name() to get the name, instead of something like
> ((NamedInternal) grouped).name().
> 
> Plus, what harm does it do to let people get back the configuration
> property that they *just set* on the config objec

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-17 Thread Bill Bejeck
I'm getting caught up with the current state of this KIP.

I agree that the question on what to do with overloads is a difficult one
to answer.

Both John and Matthias have laid out their thoughts thoroughly, and the
points made by both resonate with me.

I've spent some time thinking about this, and while I have a problem with
adding overloaded methods, I can't quite get comfortable with the notion of
Materialized naming the processing node.  For me, it comes down to the fact
that Materialized is used to configure the state store for an individual
processing node and knows nothing of the operation itself. So I'll go with
adding the Named overload to methods taking a Materialized by a narrow
margin.

As for the name method, I agree with Matthias that it's not consistent with
the approach we've taken so far whether for better or worse, but to quote
Matthias, "that ship has sailed."  IMHO adding the method for making
testing easier doesn't justify it, as there are ways to get the name via
NamedInternal class.

Just my  2 cents.

Thanks,
Bill

On Wed, Jan 16, 2019 at 5:40 PM Matthias J. Sax 
wrote:

> Thanks for the details John.
>
> While I understand your argument that it is no optimal to use
> `Materialized` to set the processor name, I still slightly prefer this
> option, because adding more overloads seems to be even worse to me.
>
> But I would also not block this KIP if the majority of people prefer to
> add overloads instead of extending `Materialized`.
>
>
> However, I cannot follow your argument about `NamedOperation#name()`
> getter method. So far, all configuration classes don't have getters and
> it seems to be inconsistent to add a single one now. We also don't need
> any cast IMHO, as we would use the same construct as we do for all other
> config classed via `NamedInternal` to access the name:
>
> > final String name = new NamedInternal(named).name();
>
> Maybe, it would have been better to add getters from the beginning on
> (even if I think it was the right decision to not add getters). However,
> this ship have sailed and if we want to add getters to avoid the
> `XxxInternal()` construct, we should do it for all classes -- however,
> what would a user gain if we do this? It would just be a lot of "noise"
> IMHO.
>
>
> @Florian: I would suggest to start a VOTE if you want to get this into
> 2.2 release. The open questions seem to be minor and I think we can
> resolve them in parallel to the vote.
>
>
>
> -Matthias
>
>
> On 1/16/19 12:59 PM, John Roesler wrote:
> > Hi Matthias,
> >
> > One thing that we discussed earlier was to avoid creating ambiguity by
> > conflating config objects that configure an operation (like Grouped) with
> > config objects that configure an aspect of the operation (like
> > Materialized).
> >
> > It is natural for the Grouped config to extend Named, as doing so
> indicates
> > that grouping operations can be named (I.e., the name applies to the
> > operation itself, which in turn makes it reasonable to use the
> operation's
> > name as a component in the related processors' and topics' names).
> >
> > But what would it mean for Materialized to extend Named? Materialized
> only
> > configures the materialization of an operation's result, not the
> operation
> > itself, so I guess it would mean the name applies to the result of the
> > operation? It doesn't really work.
> >
> > Adding config objects to the DSL was an attempt to avoid overload bloat
> as
> > more aspects of operations need to be configured.
> > However, we made a mistake with Materialized, since (as noted) it doesn't
> > configure the operation itself, but just one aspect of it.
> > We basically bagged a bunch of parameters into one, without solving the
> > problem structurally, and this is the result:
> > As soon as we need to configure a *different* aspect of the operation, we
> > again need to add a new overload, and the cycle begins again.
> >
> > The proper solution here is to add an eponymous config object to each
> > stateful operation, one which mixes in or composes the Materialized
> aspect
> > config and the Named aspect config. But this is a large API change, and
> we
> > decided on the middle ground of just adding Named as an optional
> parameter
> > via new overloads for now.
> >
> > A similar compromise was to go ahead and add a Named overload directly to
> > all the operators that currently have no config object.
> > Again, the proper thing would be to add a new config class for each
> > individual operation, but it seemed like a drastic change.
> > We basically said that right now, we don't think we'll ever need to
> > configure another aspect of those operators than the name, and we're
> > acknowledging that if we do, we'll have to created a small mess to clean
> up.
> > It's really just a generalization of the same problem with Materialized
> > operations.
> >
> > To answer your question about the Named interface:
> > The primary reason is that Named is an aspect that is meant to be 

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-17 Thread Florian Hussonnois
Thanks for your feedback. So I will remove the name() method from the
NamedOperation interface.
After a first look, I will need to introduce a new class JoinedInternal

Le jeu. 17 janv. 2019 à 19:09, Bill Bejeck  a écrit :

> I'm getting caught up with the current state of this KIP.
>
> I agree that the question on what to do with overloads is a difficult one
> to answer.
>
> Both John and Matthias have laid out their thoughts thoroughly, and the
> points made by both resonate with me.
>
> I've spent some time thinking about this, and while I have a problem with
> adding overloaded methods, I can't quite get comfortable with the notion of
> Materialized naming the processing node.  For me, it comes down to the fact
> that Materialized is used to configure the state store for an individual
> processing node and knows nothing of the operation itself. So I'll go with
> adding the Named overload to methods taking a Materialized by a narrow
> margin.
>
> As for the name method, I agree with Matthias that it's not consistent with
> the approach we've taken so far whether for better or worse, but to quote
> Matthias, "that ship has sailed."  IMHO adding the method for making
> testing easier doesn't justify it, as there are ways to get the name via
> NamedInternal class.
>
> Just my  2 cents.
>
> Thanks,
> Bill
>
> On Wed, Jan 16, 2019 at 5:40 PM Matthias J. Sax 
> wrote:
>
> > Thanks for the details John.
> >
> > While I understand your argument that it is no optimal to use
> > `Materialized` to set the processor name, I still slightly prefer this
> > option, because adding more overloads seems to be even worse to me.
> >
> > But I would also not block this KIP if the majority of people prefer to
> > add overloads instead of extending `Materialized`.
> >
> >
> > However, I cannot follow your argument about `NamedOperation#name()`
> > getter method. So far, all configuration classes don't have getters and
> > it seems to be inconsistent to add a single one now. We also don't need
> > any cast IMHO, as we would use the same construct as we do for all other
> > config classed via `NamedInternal` to access the name:
> >
> > > final String name = new NamedInternal(named).name();
> >
> > Maybe, it would have been better to add getters from the beginning on
> > (even if I think it was the right decision to not add getters). However,
> > this ship have sailed and if we want to add getters to avoid the
> > `XxxInternal()` construct, we should do it for all classes -- however,
> > what would a user gain if we do this? It would just be a lot of "noise"
> > IMHO.
> >
> >
> > @Florian: I would suggest to start a VOTE if you want to get this into
> > 2.2 release. The open questions seem to be minor and I think we can
> > resolve them in parallel to the vote.
> >
> >
> >
> > -Matthias
> >
> >
> > On 1/16/19 12:59 PM, John Roesler wrote:
> > > Hi Matthias,
> > >
> > > One thing that we discussed earlier was to avoid creating ambiguity by
> > > conflating config objects that configure an operation (like Grouped)
> with
> > > config objects that configure an aspect of the operation (like
> > > Materialized).
> > >
> > > It is natural for the Grouped config to extend Named, as doing so
> > indicates
> > > that grouping operations can be named (I.e., the name applies to the
> > > operation itself, which in turn makes it reasonable to use the
> > operation's
> > > name as a component in the related processors' and topics' names).
> > >
> > > But what would it mean for Materialized to extend Named? Materialized
> > only
> > > configures the materialization of an operation's result, not the
> > operation
> > > itself, so I guess it would mean the name applies to the result of the
> > > operation? It doesn't really work.
> > >
> > > Adding config objects to the DSL was an attempt to avoid overload bloat
> > as
> > > more aspects of operations need to be configured.
> > > However, we made a mistake with Materialized, since (as noted) it
> doesn't
> > > configure the operation itself, but just one aspect of it.
> > > We basically bagged a bunch of parameters into one, without solving the
> > > problem structurally, and this is the result:
> > > As soon as we need to configure a *different* aspect of the operation,
> we
> > > again need to add a new overload, and the cycle begins again.
> > >
> > > The proper solution here is to add an eponymous config object to each
> > > stateful operation, one which mixes in or composes the Materialized
> > aspect
> > > config and the Named aspect config. But this is a large API change, and
> > we
> > > decided on the middle ground of just adding Named as an optional
> > parameter
> > > via new overloads for now.
> > >
> > > A similar compromise was to go ahead and add a Named overload directly
> to
> > > all the operators that currently have no config object.
> > > Again, the proper thing would be to add a new config class for each
> > > individual operation, but it seemed like a drastic change.
> > >

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-17 Thread John Roesler
Just to chime in regarding NamedInternal. That was my bad mental model to
blame. It is indeed coercion, not casting. Even more relevant, I'm not a
fan of the XInternal pattern, but it is the pattern we have. It would be
worse to start carving out exceptions.

So I agree that we should have:
* `NamedOperation` interface, declaring only the `withName(String)` setter
member
* `Named implements NamedOperation`  class with a protected `name` field,
set by the `withName` setter (and also other config objects would do the
same, e.g., `Grouped implements NamedOperation`)
* `NamedInternal extends Named` class with a public (but internally
targeted) `name()` getter to expose the name to the topology builder.
Likewise all the other config classes that implement NamedOperation would
expose a `name()` getter for the same purpose. It's not in the public API,
but we should take care to make sure the getter method has the same name
everywhere for minimum confusion.

Thanks, everyone!
-John

On Thu, Jan 17, 2019 at 12:09 PM Bill Bejeck  wrote:

> I'm getting caught up with the current state of this KIP.
>
> I agree that the question on what to do with overloads is a difficult one
> to answer.
>
> Both John and Matthias have laid out their thoughts thoroughly, and the
> points made by both resonate with me.
>
> I've spent some time thinking about this, and while I have a problem with
> adding overloaded methods, I can't quite get comfortable with the notion of
> Materialized naming the processing node.  For me, it comes down to the fact
> that Materialized is used to configure the state store for an individual
> processing node and knows nothing of the operation itself. So I'll go with
> adding the Named overload to methods taking a Materialized by a narrow
> margin.
>
> As for the name method, I agree with Matthias that it's not consistent with
> the approach we've taken so far whether for better or worse, but to quote
> Matthias, "that ship has sailed."  IMHO adding the method for making
> testing easier doesn't justify it, as there are ways to get the name via
> NamedInternal class.
>
> Just my  2 cents.
>
> Thanks,
> Bill
>
> On Wed, Jan 16, 2019 at 5:40 PM Matthias J. Sax 
> wrote:
>
> > Thanks for the details John.
> >
> > While I understand your argument that it is no optimal to use
> > `Materialized` to set the processor name, I still slightly prefer this
> > option, because adding more overloads seems to be even worse to me.
> >
> > But I would also not block this KIP if the majority of people prefer to
> > add overloads instead of extending `Materialized`.
> >
> >
> > However, I cannot follow your argument about `NamedOperation#name()`
> > getter method. So far, all configuration classes don't have getters and
> > it seems to be inconsistent to add a single one now. We also don't need
> > any cast IMHO, as we would use the same construct as we do for all other
> > config classed via `NamedInternal` to access the name:
> >
> > > final String name = new NamedInternal(named).name();
> >
> > Maybe, it would have been better to add getters from the beginning on
> > (even if I think it was the right decision to not add getters). However,
> > this ship have sailed and if we want to add getters to avoid the
> > `XxxInternal()` construct, we should do it for all classes -- however,
> > what would a user gain if we do this? It would just be a lot of "noise"
> > IMHO.
> >
> >
> > @Florian: I would suggest to start a VOTE if you want to get this into
> > 2.2 release. The open questions seem to be minor and I think we can
> > resolve them in parallel to the vote.
> >
> >
> >
> > -Matthias
> >
> >
> > On 1/16/19 12:59 PM, John Roesler wrote:
> > > Hi Matthias,
> > >
> > > One thing that we discussed earlier was to avoid creating ambiguity by
> > > conflating config objects that configure an operation (like Grouped)
> with
> > > config objects that configure an aspect of the operation (like
> > > Materialized).
> > >
> > > It is natural for the Grouped config to extend Named, as doing so
> > indicates
> > > that grouping operations can be named (I.e., the name applies to the
> > > operation itself, which in turn makes it reasonable to use the
> > operation's
> > > name as a component in the related processors' and topics' names).
> > >
> > > But what would it mean for Materialized to extend Named? Materialized
> > only
> > > configures the materialization of an operation's result, not the
> > operation
> > > itself, so I guess it would mean the name applies to the result of the
> > > operation? It doesn't really work.
> > >
> > > Adding config objects to the DSL was an attempt to avoid overload bloat
> > as
> > > more aspects of operations need to be configured.
> > > However, we made a mistake with Materialized, since (as noted) it
> doesn't
> > > configure the operation itself, but just one aspect of it.
> > > We basically bagged a bunch of parameters into one, without solving the
> > > problem structurally, and this is the re

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-17 Thread Florian Hussonnois
Sorry, I've sent my previous mail to quickly. Unlike the Consumed, Produced
and Grouped classes, the Joined class does have getter methods. So I
propose to keep the name() method only for this class.
For other classes the name will be accessible through XXXInternal classes.

Le jeu. 17 janv. 2019 à 22:39, John Roesler  a écrit :

> Just to chime in regarding NamedInternal. That was my bad mental model to
> blame. It is indeed coercion, not casting. Even more relevant, I'm not a
> fan of the XInternal pattern, but it is the pattern we have. It would be
> worse to start carving out exceptions.
>
> So I agree that we should have:
> * `NamedOperation` interface, declaring only the `withName(String)` setter
> member
> * `Named implements NamedOperation`  class with a protected `name` field,
> set by the `withName` setter (and also other config objects would do the
> same, e.g., `Grouped implements NamedOperation`)
> * `NamedInternal extends Named` class with a public (but internally
> targeted) `name()` getter to expose the name to the topology builder.
> Likewise all the other config classes that implement NamedOperation would
> expose a `name()` getter for the same purpose. It's not in the public API,
> but we should take care to make sure the getter method has the same name
> everywhere for minimum confusion.
>
> Thanks, everyone!
> -John
>
> On Thu, Jan 17, 2019 at 12:09 PM Bill Bejeck  wrote:
>
> > I'm getting caught up with the current state of this KIP.
> >
> > I agree that the question on what to do with overloads is a difficult one
> > to answer.
> >
> > Both John and Matthias have laid out their thoughts thoroughly, and the
> > points made by both resonate with me.
> >
> > I've spent some time thinking about this, and while I have a problem with
> > adding overloaded methods, I can't quite get comfortable with the notion
> of
> > Materialized naming the processing node.  For me, it comes down to the
> fact
> > that Materialized is used to configure the state store for an individual
> > processing node and knows nothing of the operation itself. So I'll go
> with
> > adding the Named overload to methods taking a Materialized by a narrow
> > margin.
> >
> > As for the name method, I agree with Matthias that it's not consistent
> with
> > the approach we've taken so far whether for better or worse, but to quote
> > Matthias, "that ship has sailed."  IMHO adding the method for making
> > testing easier doesn't justify it, as there are ways to get the name via
> > NamedInternal class.
> >
> > Just my  2 cents.
> >
> > Thanks,
> > Bill
> >
> > On Wed, Jan 16, 2019 at 5:40 PM Matthias J. Sax 
> > wrote:
> >
> > > Thanks for the details John.
> > >
> > > While I understand your argument that it is no optimal to use
> > > `Materialized` to set the processor name, I still slightly prefer this
> > > option, because adding more overloads seems to be even worse to me.
> > >
> > > But I would also not block this KIP if the majority of people prefer to
> > > add overloads instead of extending `Materialized`.
> > >
> > >
> > > However, I cannot follow your argument about `NamedOperation#name()`
> > > getter method. So far, all configuration classes don't have getters and
> > > it seems to be inconsistent to add a single one now. We also don't need
> > > any cast IMHO, as we would use the same construct as we do for all
> other
> > > config classed via `NamedInternal` to access the name:
> > >
> > > > final String name = new NamedInternal(named).name();
> > >
> > > Maybe, it would have been better to add getters from the beginning on
> > > (even if I think it was the right decision to not add getters).
> However,
> > > this ship have sailed and if we want to add getters to avoid the
> > > `XxxInternal()` construct, we should do it for all classes -- however,
> > > what would a user gain if we do this? It would just be a lot of "noise"
> > > IMHO.
> > >
> > >
> > > @Florian: I would suggest to start a VOTE if you want to get this into
> > > 2.2 release. The open questions seem to be minor and I think we can
> > > resolve them in parallel to the vote.
> > >
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 1/16/19 12:59 PM, John Roesler wrote:
> > > > Hi Matthias,
> > > >
> > > > One thing that we discussed earlier was to avoid creating ambiguity
> by
> > > > conflating config objects that configure an operation (like Grouped)
> > with
> > > > config objects that configure an aspect of the operation (like
> > > > Materialized).
> > > >
> > > > It is natural for the Grouped config to extend Named, as doing so
> > > indicates
> > > > that grouping operations can be named (I.e., the name applies to the
> > > > operation itself, which in turn makes it reasonable to use the
> > > operation's
> > > > name as a component in the related processors' and topics' names).
> > > >
> > > > But what would it mean for Materialized to extend Named? Materialized
> > > only
> > > > configures the materialization of an operation's result, no

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-17 Thread Bill Bejeck
Sounds good to me.

Thanks,
Bill

On Thu, Jan 17, 2019 at 4:43 PM Florian Hussonnois 
wrote:

> Sorry, I've sent my previous mail to quickly. Unlike the Consumed, Produced
> and Grouped classes, the Joined class does have getter methods. So I
> propose to keep the name() method only for this class.
> For other classes the name will be accessible through XXXInternal classes.
>
> Le jeu. 17 janv. 2019 à 22:39, John Roesler  a écrit :
>
> > Just to chime in regarding NamedInternal. That was my bad mental model to
> > blame. It is indeed coercion, not casting. Even more relevant, I'm not a
> > fan of the XInternal pattern, but it is the pattern we have. It would be
> > worse to start carving out exceptions.
> >
> > So I agree that we should have:
> > * `NamedOperation` interface, declaring only the `withName(String)`
> setter
> > member
> > * `Named implements NamedOperation`  class with a protected `name` field,
> > set by the `withName` setter (and also other config objects would do the
> > same, e.g., `Grouped implements NamedOperation`)
> > * `NamedInternal extends Named` class with a public (but internally
> > targeted) `name()` getter to expose the name to the topology builder.
> > Likewise all the other config classes that implement NamedOperation would
> > expose a `name()` getter for the same purpose. It's not in the public
> API,
> > but we should take care to make sure the getter method has the same name
> > everywhere for minimum confusion.
> >
> > Thanks, everyone!
> > -John
> >
> > On Thu, Jan 17, 2019 at 12:09 PM Bill Bejeck  wrote:
> >
> > > I'm getting caught up with the current state of this KIP.
> > >
> > > I agree that the question on what to do with overloads is a difficult
> one
> > > to answer.
> > >
> > > Both John and Matthias have laid out their thoughts thoroughly, and the
> > > points made by both resonate with me.
> > >
> > > I've spent some time thinking about this, and while I have a problem
> with
> > > adding overloaded methods, I can't quite get comfortable with the
> notion
> > of
> > > Materialized naming the processing node.  For me, it comes down to the
> > fact
> > > that Materialized is used to configure the state store for an
> individual
> > > processing node and knows nothing of the operation itself. So I'll go
> > with
> > > adding the Named overload to methods taking a Materialized by a narrow
> > > margin.
> > >
> > > As for the name method, I agree with Matthias that it's not consistent
> > with
> > > the approach we've taken so far whether for better or worse, but to
> quote
> > > Matthias, "that ship has sailed."  IMHO adding the method for making
> > > testing easier doesn't justify it, as there are ways to get the name
> via
> > > NamedInternal class.
> > >
> > > Just my  2 cents.
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Wed, Jan 16, 2019 at 5:40 PM Matthias J. Sax  >
> > > wrote:
> > >
> > > > Thanks for the details John.
> > > >
> > > > While I understand your argument that it is no optimal to use
> > > > `Materialized` to set the processor name, I still slightly prefer
> this
> > > > option, because adding more overloads seems to be even worse to me.
> > > >
> > > > But I would also not block this KIP if the majority of people prefer
> to
> > > > add overloads instead of extending `Materialized`.
> > > >
> > > >
> > > > However, I cannot follow your argument about `NamedOperation#name()`
> > > > getter method. So far, all configuration classes don't have getters
> and
> > > > it seems to be inconsistent to add a single one now. We also don't
> need
> > > > any cast IMHO, as we would use the same construct as we do for all
> > other
> > > > config classed via `NamedInternal` to access the name:
> > > >
> > > > > final String name = new NamedInternal(named).name();
> > > >
> > > > Maybe, it would have been better to add getters from the beginning on
> > > > (even if I think it was the right decision to not add getters).
> > However,
> > > > this ship have sailed and if we want to add getters to avoid the
> > > > `XxxInternal()` construct, we should do it for all classes --
> however,
> > > > what would a user gain if we do this? It would just be a lot of
> "noise"
> > > > IMHO.
> > > >
> > > >
> > > > @Florian: I would suggest to start a VOTE if you want to get this
> into
> > > > 2.2 release. The open questions seem to be minor and I think we can
> > > > resolve them in parallel to the vote.
> > > >
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 1/16/19 12:59 PM, John Roesler wrote:
> > > > > Hi Matthias,
> > > > >
> > > > > One thing that we discussed earlier was to avoid creating ambiguity
> > by
> > > > > conflating config objects that configure an operation (like
> Grouped)
> > > with
> > > > > config objects that configure an aspect of the operation (like
> > > > > Materialized).
> > > > >
> > > > > It is natural for the Grouped config to extend Named, as doing so
> > > > indicates
> > > > > that grouping operations can be named (I.e.

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-17 Thread Guozhang Wang
Wow that's a lot of discussions in 6 days! :) Just catching up and sharing
my two cents here:

1. Materialized: I'm inclined to not let Materialized extending Named and
add the overload as well. All the rationales have been very well summarized
before. Just to emphasize on John's points: Materialized is considered as
the control object being leveraged by the optimization framework to
determine if the state store should be physically materialized or not. So
let's say if the user does not want to query the store (hence it can just
be locally materialized), but still want to name the processor, they need
to do either "count(Materialized.as(null).withName("processorName"));" or
"count(Named.as("processorName"));" and neither of it is a bit hard to
educate to users, and hence it looks that an overload function with two
parameters are easier to understand.

2. As for `NamedOperation`: I've left a comment about it before, i.e. "1)
Regarding the interface / function name, I'd propose we call the interface
`NamedOperation` which would be implemented by Produced / Consumed /
Printed / Joined / Grouped / Suppressed (note I intentionally exclude
Materialized here since its semantics is quite), and have the default class
that implements `NamedOperation` as `Named`, which would be used in our
adding overload functions. The main reason is to have consistency in
naming." And I think I'm on the same page with John with his more detailed
proposal.

3. As for `Joined`: I actually would suggest we bite the bullet and remove
it as well, because we are trying to fix some inconsistencies in this KIP
anyways (or is that not agreed upon yet?), my thoughts were that we will
have the following breaking renamings as below:

3.a) static Joined#named() -> Joined#as()
3.b) Joined#name() -> "deleted"


I also think that we can start the voting thread asap since we are
achieving to an consensus and the KIP deadline is approaching. The wiki
page itself may still need to be updated though with the API breaking
changes above.


Guozhang


On Thu, Jan 17, 2019 at 1:43 PM Florian Hussonnois 
wrote:

> Sorry, I've sent my previous mail to quickly. Unlike the Consumed, Produced
> and Grouped classes, the Joined class does have getter methods. So I
> propose to keep the name() method only for this class.
> For other classes the name will be accessible through XXXInternal classes.
>
> Le jeu. 17 janv. 2019 à 22:39, John Roesler  a écrit :
>
> > Just to chime in regarding NamedInternal. That was my bad mental model to
> > blame. It is indeed coercion, not casting. Even more relevant, I'm not a
> > fan of the XInternal pattern, but it is the pattern we have. It would be
> > worse to start carving out exceptions.
> >
> > So I agree that we should have:
> > * `NamedOperation` interface, declaring only the `withName(String)`
> setter
> > member
> > * `Named implements NamedOperation`  class with a protected `name` field,
> > set by the `withName` setter (and also other config objects would do the
> > same, e.g., `Grouped implements NamedOperation`)
> > * `NamedInternal extends Named` class with a public (but internally
> > targeted) `name()` getter to expose the name to the topology builder.
> > Likewise all the other config classes that implement NamedOperation would
> > expose a `name()` getter for the same purpose. It's not in the public
> API,
> > but we should take care to make sure the getter method has the same name
> > everywhere for minimum confusion.
> >
> > Thanks, everyone!
> > -John
> >
> > On Thu, Jan 17, 2019 at 12:09 PM Bill Bejeck  wrote:
> >
> > > I'm getting caught up with the current state of this KIP.
> > >
> > > I agree that the question on what to do with overloads is a difficult
> one
> > > to answer.
> > >
> > > Both John and Matthias have laid out their thoughts thoroughly, and the
> > > points made by both resonate with me.
> > >
> > > I've spent some time thinking about this, and while I have a problem
> with
> > > adding overloaded methods, I can't quite get comfortable with the
> notion
> > of
> > > Materialized naming the processing node.  For me, it comes down to the
> > fact
> > > that Materialized is used to configure the state store for an
> individual
> > > processing node and knows nothing of the operation itself. So I'll go
> > with
> > > adding the Named overload to methods taking a Materialized by a narrow
> > > margin.
> > >
> > > As for the name method, I agree with Matthias that it's not consistent
> > with
> > > the approach we've taken so far whether for better or worse, but to
> quote
> > > Matthias, "that ship has sailed."  IMHO adding the method for making
> > > testing easier doesn't justify it, as there are ways to get the name
> via
> > > NamedInternal class.
> > >
> > > Just my  2 cents.
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Wed, Jan 16, 2019 at 5:40 PM Matthias J. Sax  >
> > > wrote:
> > >
> > > > Thanks for the details John.
> > > >
> > > > While I understand your argument that it is no opti

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-17 Thread Matthias J. Sax
Thanks for all the follow up comments!

As I mentioned earlier, I am ok with adding overloads instead of using
Materialized to specify the processor name. Seems this is what the
majority of people prefers.

I am also +1 on Guozhang's suggestion to deprecate `static
Joined#named()` and replace it with `static Joined#as` for consistency
and to deprecate getter `Joined#name()` for removal and introduce
`JoinedInternal` to access the name.

@Guozhang: the vote is already up :)


-Matthias

On 1/17/19 2:45 PM, Guozhang Wang wrote:
> Wow that's a lot of discussions in 6 days! :) Just catching up and sharing
> my two cents here:
> 
> 1. Materialized: I'm inclined to not let Materialized extending Named and
> add the overload as well. All the rationales have been very well summarized
> before. Just to emphasize on John's points: Materialized is considered as
> the control object being leveraged by the optimization framework to
> determine if the state store should be physically materialized or not. So
> let's say if the user does not want to query the store (hence it can just
> be locally materialized), but still want to name the processor, they need
> to do either "count(Materialized.as(null).withName("processorName"));" or
> "count(Named.as("processorName"));" and neither of it is a bit hard to
> educate to users, and hence it looks that an overload function with two
> parameters are easier to understand.
> 
> 2. As for `NamedOperation`: I've left a comment about it before, i.e. "1)
> Regarding the interface / function name, I'd propose we call the interface
> `NamedOperation` which would be implemented by Produced / Consumed /
> Printed / Joined / Grouped / Suppressed (note I intentionally exclude
> Materialized here since its semantics is quite), and have the default class
> that implements `NamedOperation` as `Named`, which would be used in our
> adding overload functions. The main reason is to have consistency in
> naming." And I think I'm on the same page with John with his more detailed
> proposal.
> 
> 3. As for `Joined`: I actually would suggest we bite the bullet and remove
> it as well, because we are trying to fix some inconsistencies in this KIP
> anyways (or is that not agreed upon yet?), my thoughts were that we will
> have the following breaking renamings as below:
> 
> 3.a) static Joined#named() -> Joined#as()
> 3.b) Joined#name() -> "deleted"
> 
> 
> I also think that we can start the voting thread asap since we are
> achieving to an consensus and the KIP deadline is approaching. The wiki
> page itself may still need to be updated though with the API breaking
> changes above.
> 
> 
> Guozhang
> 
> 
> On Thu, Jan 17, 2019 at 1:43 PM Florian Hussonnois 
> wrote:
> 
>> Sorry, I've sent my previous mail to quickly. Unlike the Consumed, Produced
>> and Grouped classes, the Joined class does have getter methods. So I
>> propose to keep the name() method only for this class.
>> For other classes the name will be accessible through XXXInternal classes.
>>
>> Le jeu. 17 janv. 2019 à 22:39, John Roesler  a écrit :
>>
>>> Just to chime in regarding NamedInternal. That was my bad mental model to
>>> blame. It is indeed coercion, not casting. Even more relevant, I'm not a
>>> fan of the XInternal pattern, but it is the pattern we have. It would be
>>> worse to start carving out exceptions.
>>>
>>> So I agree that we should have:
>>> * `NamedOperation` interface, declaring only the `withName(String)`
>> setter
>>> member
>>> * `Named implements NamedOperation`  class with a protected `name` field,
>>> set by the `withName` setter (and also other config objects would do the
>>> same, e.g., `Grouped implements NamedOperation`)
>>> * `NamedInternal extends Named` class with a public (but internally
>>> targeted) `name()` getter to expose the name to the topology builder.
>>> Likewise all the other config classes that implement NamedOperation would
>>> expose a `name()` getter for the same purpose. It's not in the public
>> API,
>>> but we should take care to make sure the getter method has the same name
>>> everywhere for minimum confusion.
>>>
>>> Thanks, everyone!
>>> -John
>>>
>>> On Thu, Jan 17, 2019 at 12:09 PM Bill Bejeck  wrote:
>>>
 I'm getting caught up with the current state of this KIP.

 I agree that the question on what to do with overloads is a difficult
>> one
 to answer.

 Both John and Matthias have laid out their thoughts thoroughly, and the
 points made by both resonate with me.

 I've spent some time thinking about this, and while I have a problem
>> with
 adding overloaded methods, I can't quite get comfortable with the
>> notion
>>> of
 Materialized naming the processing node.  For me, it comes down to the
>>> fact
 that Materialized is used to configure the state store for an
>> individual
 processing node and knows nothing of the operation itself. So I'll go
>>> with
 adding the Named overload to methods taking a Materialized by a na

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-17 Thread Bill Bejeck
+1 for me on Guozhang's proposal for changes to Joined.

Thanks,
Bill

On Thu, Jan 17, 2019 at 5:55 PM Matthias J. Sax 
wrote:

> Thanks for all the follow up comments!
>
> As I mentioned earlier, I am ok with adding overloads instead of using
> Materialized to specify the processor name. Seems this is what the
> majority of people prefers.
>
> I am also +1 on Guozhang's suggestion to deprecate `static
> Joined#named()` and replace it with `static Joined#as` for consistency
> and to deprecate getter `Joined#name()` for removal and introduce
> `JoinedInternal` to access the name.
>
> @Guozhang: the vote is already up :)
>
>
> -Matthias
>
> On 1/17/19 2:45 PM, Guozhang Wang wrote:
> > Wow that's a lot of discussions in 6 days! :) Just catching up and
> sharing
> > my two cents here:
> >
> > 1. Materialized: I'm inclined to not let Materialized extending Named and
> > add the overload as well. All the rationales have been very well
> summarized
> > before. Just to emphasize on John's points: Materialized is considered as
> > the control object being leveraged by the optimization framework to
> > determine if the state store should be physically materialized or not. So
> > let's say if the user does not want to query the store (hence it can just
> > be locally materialized), but still want to name the processor, they need
> > to do either "count(Materialized.as(null).withName("processorName"));" or
> > "count(Named.as("processorName"));" and neither of it is a bit hard to
> > educate to users, and hence it looks that an overload function with two
> > parameters are easier to understand.
> >
> > 2. As for `NamedOperation`: I've left a comment about it before, i.e. "1)
> > Regarding the interface / function name, I'd propose we call the
> interface
> > `NamedOperation` which would be implemented by Produced / Consumed /
> > Printed / Joined / Grouped / Suppressed (note I intentionally exclude
> > Materialized here since its semantics is quite), and have the default
> class
> > that implements `NamedOperation` as `Named`, which would be used in our
> > adding overload functions. The main reason is to have consistency in
> > naming." And I think I'm on the same page with John with his more
> detailed
> > proposal.
> >
> > 3. As for `Joined`: I actually would suggest we bite the bullet and
> remove
> > it as well, because we are trying to fix some inconsistencies in this KIP
> > anyways (or is that not agreed upon yet?), my thoughts were that we will
> > have the following breaking renamings as below:
> >
> > 3.a) static Joined#named() -> Joined#as()
> > 3.b) Joined#name() -> "deleted"
> >
> >
> > I also think that we can start the voting thread asap since we are
> > achieving to an consensus and the KIP deadline is approaching. The wiki
> > page itself may still need to be updated though with the API breaking
> > changes above.
> >
> >
> > Guozhang
> >
> >
> > On Thu, Jan 17, 2019 at 1:43 PM Florian Hussonnois <
> fhussonn...@gmail.com>
> > wrote:
> >
> >> Sorry, I've sent my previous mail to quickly. Unlike the Consumed,
> Produced
> >> and Grouped classes, the Joined class does have getter methods. So I
> >> propose to keep the name() method only for this class.
> >> For other classes the name will be accessible through XXXInternal
> classes.
> >>
> >> Le jeu. 17 janv. 2019 à 22:39, John Roesler  a
> écrit :
> >>
> >>> Just to chime in regarding NamedInternal. That was my bad mental model
> to
> >>> blame. It is indeed coercion, not casting. Even more relevant, I'm not
> a
> >>> fan of the XInternal pattern, but it is the pattern we have. It would
> be
> >>> worse to start carving out exceptions.
> >>>
> >>> So I agree that we should have:
> >>> * `NamedOperation` interface, declaring only the `withName(String)`
> >> setter
> >>> member
> >>> * `Named implements NamedOperation`  class with a protected `name`
> field,
> >>> set by the `withName` setter (and also other config objects would do
> the
> >>> same, e.g., `Grouped implements NamedOperation`)
> >>> * `NamedInternal extends Named` class with a public (but internally
> >>> targeted) `name()` getter to expose the name to the topology builder.
> >>> Likewise all the other config classes that implement NamedOperation
> would
> >>> expose a `name()` getter for the same purpose. It's not in the public
> >> API,
> >>> but we should take care to make sure the getter method has the same
> name
> >>> everywhere for minimum confusion.
> >>>
> >>> Thanks, everyone!
> >>> -John
> >>>
> >>> On Thu, Jan 17, 2019 at 12:09 PM Bill Bejeck 
> wrote:
> >>>
>  I'm getting caught up with the current state of this KIP.
> 
>  I agree that the question on what to do with overloads is a difficult
> >> one
>  to answer.
> 
>  Both John and Matthias have laid out their thoughts thoroughly, and
> the
>  points made by both resonate with me.
> 
>  I've spent some time thinking about this, and while I have a problem
> >> with
>  adding overloaded

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-01-25 Thread Matthias J. Sax
I was reading the KIP again, and there are still some open question and
inconsistencies:

For example for `KGroupedStream#count(Named)` the KIP says, that only
the processor will be named, while the state store name will be `PREFIX
+ COUNT` (ie, an auto-generated name). Additionally, for
`KGroupedStream#count(Named, Materialized)` the processor will be named
according to `Named` and the store will be named according to
`Materialized.as()`. So far so good. It implies that naming the
processor and naming the store are independent. (This pattern is applied
to all aggregation functions, for KStream and KTable).

However, for `KTable#filter(Predicate, Named)` the KIP says, the
processor name and the store name are set. This sound wrong (ie,
inconsistent with the first paragraph from above), because there is also
`KTable#filter(Predicate, Named, Materialized)`. Also note, for the
first operator, the store might not be materialized to at all. (This
issue is there for all KTable operators -- stateless and stateful).

Finally, there is the following statement in the KIP:

> Also, note that for all methods accepting a Materialized argument, if no 
> state store named is provided then the node named will be used to generate a 
> one. The state store name will be the node name suffixed with "-table".


This contradict the non-naming of stores from the very beginning.


Also, the KIP still contains the question about `join(GlobalKTable,
KeyValueMapper, ValueJoiner)` and `leftJoin(GlobalKTable,
KeyValueMapper, ValueJoiner)`. I think a consistent approach would be to
add one overload each that takes a `Named` parameter.


Thoughts?


-Matthias


On 1/17/19 2:56 PM, Bill Bejeck wrote:
> +1 for me on Guozhang's proposal for changes to Joined.
> 
> Thanks,
> Bill
> 
> On Thu, Jan 17, 2019 at 5:55 PM Matthias J. Sax 
> wrote:
> 
>> Thanks for all the follow up comments!
>>
>> As I mentioned earlier, I am ok with adding overloads instead of using
>> Materialized to specify the processor name. Seems this is what the
>> majority of people prefers.
>>
>> I am also +1 on Guozhang's suggestion to deprecate `static
>> Joined#named()` and replace it with `static Joined#as` for consistency
>> and to deprecate getter `Joined#name()` for removal and introduce
>> `JoinedInternal` to access the name.
>>
>> @Guozhang: the vote is already up :)
>>
>>
>> -Matthias
>>
>> On 1/17/19 2:45 PM, Guozhang Wang wrote:
>>> Wow that's a lot of discussions in 6 days! :) Just catching up and
>> sharing
>>> my two cents here:
>>>
>>> 1. Materialized: I'm inclined to not let Materialized extending Named and
>>> add the overload as well. All the rationales have been very well
>> summarized
>>> before. Just to emphasize on John's points: Materialized is considered as
>>> the control object being leveraged by the optimization framework to
>>> determine if the state store should be physically materialized or not. So
>>> let's say if the user does not want to query the store (hence it can just
>>> be locally materialized), but still want to name the processor, they need
>>> to do either "count(Materialized.as(null).withName("processorName"));" or
>>> "count(Named.as("processorName"));" and neither of it is a bit hard to
>>> educate to users, and hence it looks that an overload function with two
>>> parameters are easier to understand.
>>>
>>> 2. As for `NamedOperation`: I've left a comment about it before, i.e. "1)
>>> Regarding the interface / function name, I'd propose we call the
>> interface
>>> `NamedOperation` which would be implemented by Produced / Consumed /
>>> Printed / Joined / Grouped / Suppressed (note I intentionally exclude
>>> Materialized here since its semantics is quite), and have the default
>> class
>>> that implements `NamedOperation` as `Named`, which would be used in our
>>> adding overload functions. The main reason is to have consistency in
>>> naming." And I think I'm on the same page with John with his more
>> detailed
>>> proposal.
>>>
>>> 3. As for `Joined`: I actually would suggest we bite the bullet and
>> remove
>>> it as well, because we are trying to fix some inconsistencies in this KIP
>>> anyways (or is that not agreed upon yet?), my thoughts were that we will
>>> have the following breaking renamings as below:
>>>
>>> 3.a) static Joined#named() -> Joined#as()
>>> 3.b) Joined#name() -> "deleted"
>>>
>>>
>>> I also think that we can start the voting thread asap since we are
>>> achieving to an consensus and the KIP deadline is approaching. The wiki
>>> page itself may still need to be updated though with the API breaking
>>> changes above.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Thu, Jan 17, 2019 at 1:43 PM Florian Hussonnois <
>> fhussonn...@gmail.com>
>>> wrote:
>>>
 Sorry, I've sent my previous mail to quickly. Unlike the Consumed,
>> Produced
 and Grouped classes, the Joined class does have getter methods. So I
 propose to keep the name() method only for this class.
 For other classes the name will be acces

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-02-05 Thread Florian Hussonnois
Hi Matthias,

Regaridng your feedback, I've updated the KIP and PR in a way that state
store are only named regarding the provided Materialized.
I have also overload the methods: join(GlobalKTable, KeyValueMapper,
ValueJoiner)` and `leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)`

(Sorry my late reply - some difficulties to find time to work on the KIP
during this last weeks)

Le ven. 25 janv. 2019 à 18:50, Matthias J. Sax  a
écrit :

> I was reading the KIP again, and there are still some open question and
> inconsistencies:
>
> For example for `KGroupedStream#count(Named)` the KIP says, that only
> the processor will be named, while the state store name will be `PREFIX
> + COUNT` (ie, an auto-generated name). Additionally, for
> `KGroupedStream#count(Named, Materialized)` the processor will be named
> according to `Named` and the store will be named according to
> `Materialized.as()`. So far so good. It implies that naming the
> processor and naming the store are independent. (This pattern is applied
> to all aggregation functions, for KStream and KTable).
>
> However, for `KTable#filter(Predicate, Named)` the KIP says, the
> processor name and the store name are set. This sound wrong (ie,
> inconsistent with the first paragraph from above), because there is also
> `KTable#filter(Predicate, Named, Materialized)`. Also note, for the
> first operator, the store might not be materialized to at all. (This
> issue is there for all KTable operators -- stateless and stateful).
>
> Finally, there is the following statement in the KIP:
>
> > Also, note that for all methods accepting a Materialized argument, if no
> state store named is provided then the node named will be used to generate
> a one. The state store name will be the node name suffixed with "-table".
>
>
> This contradict the non-naming of stores from the very beginning.
>
>
> Also, the KIP still contains the question about `join(GlobalKTable,
> KeyValueMapper, ValueJoiner)` and `leftJoin(GlobalKTable,
> KeyValueMapper, ValueJoiner)`. I think a consistent approach would be to
> add one overload each that takes a `Named` parameter.
>
>
> Thoughts?
>
>
> -Matthias
>
>
> On 1/17/19 2:56 PM, Bill Bejeck wrote:
> > +1 for me on Guozhang's proposal for changes to Joined.
> >
> > Thanks,
> > Bill
> >
> > On Thu, Jan 17, 2019 at 5:55 PM Matthias J. Sax 
> > wrote:
> >
> >> Thanks for all the follow up comments!
> >>
> >> As I mentioned earlier, I am ok with adding overloads instead of using
> >> Materialized to specify the processor name. Seems this is what the
> >> majority of people prefers.
> >>
> >> I am also +1 on Guozhang's suggestion to deprecate `static
> >> Joined#named()` and replace it with `static Joined#as` for consistency
> >> and to deprecate getter `Joined#name()` for removal and introduce
> >> `JoinedInternal` to access the name.
> >>
> >> @Guozhang: the vote is already up :)
> >>
> >>
> >> -Matthias
> >>
> >> On 1/17/19 2:45 PM, Guozhang Wang wrote:
> >>> Wow that's a lot of discussions in 6 days! :) Just catching up and
> >> sharing
> >>> my two cents here:
> >>>
> >>> 1. Materialized: I'm inclined to not let Materialized extending Named
> and
> >>> add the overload as well. All the rationales have been very well
> >> summarized
> >>> before. Just to emphasize on John's points: Materialized is considered
> as
> >>> the control object being leveraged by the optimization framework to
> >>> determine if the state store should be physically materialized or not.
> So
> >>> let's say if the user does not want to query the store (hence it can
> just
> >>> be locally materialized), but still want to name the processor, they
> need
> >>> to do either "count(Materialized.as(null).withName("processorName"));"
> or
> >>> "count(Named.as("processorName"));" and neither of it is a bit hard to
> >>> educate to users, and hence it looks that an overload function with two
> >>> parameters are easier to understand.
> >>>
> >>> 2. As for `NamedOperation`: I've left a comment about it before, i.e.
> "1)
> >>> Regarding the interface / function name, I'd propose we call the
> >> interface
> >>> `NamedOperation` which would be implemented by Produced / Consumed /
> >>> Printed / Joined / Grouped / Suppressed (note I intentionally exclude
> >>> Materialized here since its semantics is quite), and have the default
> >> class
> >>> that implements `NamedOperation` as `Named`, which would be used in our
> >>> adding overload functions. The main reason is to have consistency in
> >>> naming." And I think I'm on the same page with John with his more
> >> detailed
> >>> proposal.
> >>>
> >>> 3. As for `Joined`: I actually would suggest we bite the bullet and
> >> remove
> >>> it as well, because we are trying to fix some inconsistencies in this
> KIP
> >>> anyways (or is that not agreed upon yet?), my thoughts were that we
> will
> >>> have the following breaking renamings as below:
> >>>
> >>> 3.a) static Joined#named() -> Joined#as()
> >>> 3.b) Joined#name() ->

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-02-20 Thread Matthias J. Sax
Florian,

thanks for updating the KIP (and no worries for late reply -- 2.2
release kept us busy anyway...). Overall LGTM.

Just some nits:


KStream-Table:

Do we need to list the existing stream-globalTable join methods in the
first table (thought it should only contain new/changing methods).

typo: `join(GlobalKTbale, KeyValueMapper, ValueJoiner, Named)`

`leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)` is missing the new
`Named` parameter.

`static Joined#named(final String name)`
 -> should be `#as(...)` instead of `named(...)`

flatTransform() is missing (cf. KIP-313)



KTable-table:

`Suppressed#withName(String)`
 -> should we change this to `#as(...)` too (similar to `named()`)



-Matthias



On 1/25/19 9:49 AM, Matthias J. Sax wrote:
> I was reading the KIP again, and there are still some open question and
> inconsistencies:
> 
> For example for `KGroupedStream#count(Named)` the KIP says, that only
> the processor will be named, while the state store name will be `PREFIX
> + COUNT` (ie, an auto-generated name). Additionally, for
> `KGroupedStream#count(Named, Materialized)` the processor will be named
> according to `Named` and the store will be named according to
> `Materialized.as()`. So far so good. It implies that naming the
> processor and naming the store are independent. (This pattern is applied
> to all aggregation functions, for KStream and KTable).
> 
> However, for `KTable#filter(Predicate, Named)` the KIP says, the
> processor name and the store name are set. This sound wrong (ie,
> inconsistent with the first paragraph from above), because there is also
> `KTable#filter(Predicate, Named, Materialized)`. Also note, for the
> first operator, the store might not be materialized to at all. (This
> issue is there for all KTable operators -- stateless and stateful).
> 
> Finally, there is the following statement in the KIP:
> 
>> Also, note that for all methods accepting a Materialized argument, if no 
>> state store named is provided then the node named will be used to generate a 
>> one. The state store name will be the node name suffixed with "-table".
> 
> 
> This contradict the non-naming of stores from the very beginning.
> 
> 
> Also, the KIP still contains the question about `join(GlobalKTable,
> KeyValueMapper, ValueJoiner)` and `leftJoin(GlobalKTable,
> KeyValueMapper, ValueJoiner)`. I think a consistent approach would be to
> add one overload each that takes a `Named` parameter.
> 
> 
> Thoughts?
> 
> 
> -Matthias
> 
> 
> On 1/17/19 2:56 PM, Bill Bejeck wrote:
>> +1 for me on Guozhang's proposal for changes to Joined.
>>
>> Thanks,
>> Bill
>>
>> On Thu, Jan 17, 2019 at 5:55 PM Matthias J. Sax 
>> wrote:
>>
>>> Thanks for all the follow up comments!
>>>
>>> As I mentioned earlier, I am ok with adding overloads instead of using
>>> Materialized to specify the processor name. Seems this is what the
>>> majority of people prefers.
>>>
>>> I am also +1 on Guozhang's suggestion to deprecate `static
>>> Joined#named()` and replace it with `static Joined#as` for consistency
>>> and to deprecate getter `Joined#name()` for removal and introduce
>>> `JoinedInternal` to access the name.
>>>
>>> @Guozhang: the vote is already up :)
>>>
>>>
>>> -Matthias
>>>
>>> On 1/17/19 2:45 PM, Guozhang Wang wrote:
 Wow that's a lot of discussions in 6 days! :) Just catching up and
>>> sharing
 my two cents here:

 1. Materialized: I'm inclined to not let Materialized extending Named and
 add the overload as well. All the rationales have been very well
>>> summarized
 before. Just to emphasize on John's points: Materialized is considered as
 the control object being leveraged by the optimization framework to
 determine if the state store should be physically materialized or not. So
 let's say if the user does not want to query the store (hence it can just
 be locally materialized), but still want to name the processor, they need
 to do either "count(Materialized.as(null).withName("processorName"));" or
 "count(Named.as("processorName"));" and neither of it is a bit hard to
 educate to users, and hence it looks that an overload function with two
 parameters are easier to understand.

 2. As for `NamedOperation`: I've left a comment about it before, i.e. "1)
 Regarding the interface / function name, I'd propose we call the
>>> interface
 `NamedOperation` which would be implemented by Produced / Consumed /
 Printed / Joined / Grouped / Suppressed (note I intentionally exclude
 Materialized here since its semantics is quite), and have the default
>>> class
 that implements `NamedOperation` as `Named`, which would be used in our
 adding overload functions. The main reason is to have consistency in
 naming." And I think I'm on the same page with John with his more
>>> detailed
 proposal.

 3. As for `Joined`: I actually would suggest we bite the bullet and
>>> remove
 it as well, because we are tryi

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-02-21 Thread Bill Bejeck
Hi Florian,

Overall the KIP LGTM.  Once you've addressed the final comments from
Matthias I think we can put this up for a vote.

Thanks,
Bill

On Wed, Feb 20, 2019 at 9:42 PM Matthias J. Sax 
wrote:

> Florian,
>
> thanks for updating the KIP (and no worries for late reply -- 2.2
> release kept us busy anyway...). Overall LGTM.
>
> Just some nits:
>
>
> KStream-Table:
>
> Do we need to list the existing stream-globalTable join methods in the
> first table (thought it should only contain new/changing methods).
>
> typo: `join(GlobalKTbale, KeyValueMapper, ValueJoiner, Named)`
>
> `leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)` is missing the new
> `Named` parameter.
>
> `static Joined#named(final String name)`
>  -> should be `#as(...)` instead of `named(...)`
>
> flatTransform() is missing (cf. KIP-313)
>
>
>
> KTable-table:
>
> `Suppressed#withName(String)`
>  -> should we change this to `#as(...)` too (similar to `named()`)
>
>
>
> -Matthias
>
>
>
> On 1/25/19 9:49 AM, Matthias J. Sax wrote:
> > I was reading the KIP again, and there are still some open question and
> > inconsistencies:
> >
> > For example for `KGroupedStream#count(Named)` the KIP says, that only
> > the processor will be named, while the state store name will be `PREFIX
> > + COUNT` (ie, an auto-generated name). Additionally, for
> > `KGroupedStream#count(Named, Materialized)` the processor will be named
> > according to `Named` and the store will be named according to
> > `Materialized.as()`. So far so good. It implies that naming the
> > processor and naming the store are independent. (This pattern is applied
> > to all aggregation functions, for KStream and KTable).
> >
> > However, for `KTable#filter(Predicate, Named)` the KIP says, the
> > processor name and the store name are set. This sound wrong (ie,
> > inconsistent with the first paragraph from above), because there is also
> > `KTable#filter(Predicate, Named, Materialized)`. Also note, for the
> > first operator, the store might not be materialized to at all. (This
> > issue is there for all KTable operators -- stateless and stateful).
> >
> > Finally, there is the following statement in the KIP:
> >
> >> Also, note that for all methods accepting a Materialized argument, if
> no state store named is provided then the node named will be used to
> generate a one. The state store name will be the node name suffixed with
> "-table".
> >
> >
> > This contradict the non-naming of stores from the very beginning.
> >
> >
> > Also, the KIP still contains the question about `join(GlobalKTable,
> > KeyValueMapper, ValueJoiner)` and `leftJoin(GlobalKTable,
> > KeyValueMapper, ValueJoiner)`. I think a consistent approach would be to
> > add one overload each that takes a `Named` parameter.
> >
> >
> > Thoughts?
> >
> >
> > -Matthias
> >
> >
> > On 1/17/19 2:56 PM, Bill Bejeck wrote:
> >> +1 for me on Guozhang's proposal for changes to Joined.
> >>
> >> Thanks,
> >> Bill
> >>
> >> On Thu, Jan 17, 2019 at 5:55 PM Matthias J. Sax 
> >> wrote:
> >>
> >>> Thanks for all the follow up comments!
> >>>
> >>> As I mentioned earlier, I am ok with adding overloads instead of using
> >>> Materialized to specify the processor name. Seems this is what the
> >>> majority of people prefers.
> >>>
> >>> I am also +1 on Guozhang's suggestion to deprecate `static
> >>> Joined#named()` and replace it with `static Joined#as` for consistency
> >>> and to deprecate getter `Joined#name()` for removal and introduce
> >>> `JoinedInternal` to access the name.
> >>>
> >>> @Guozhang: the vote is already up :)
> >>>
> >>>
> >>> -Matthias
> >>>
> >>> On 1/17/19 2:45 PM, Guozhang Wang wrote:
>  Wow that's a lot of discussions in 6 days! :) Just catching up and
> >>> sharing
>  my two cents here:
> 
>  1. Materialized: I'm inclined to not let Materialized extending Named
> and
>  add the overload as well. All the rationales have been very well
> >>> summarized
>  before. Just to emphasize on John's points: Materialized is
> considered as
>  the control object being leveraged by the optimization framework to
>  determine if the state store should be physically materialized or
> not. So
>  let's say if the user does not want to query the store (hence it can
> just
>  be locally materialized), but still want to name the processor, they
> need
>  to do either
> "count(Materialized.as(null).withName("processorName"));" or
>  "count(Named.as("processorName"));" and neither of it is a bit hard to
>  educate to users, and hence it looks that an overload function with
> two
>  parameters are easier to understand.
> 
>  2. As for `NamedOperation`: I've left a comment about it before, i.e.
> "1)
>  Regarding the interface / function name, I'd propose we call the
> >>> interface
>  `NamedOperation` which would be implemented by Produced / Consumed /
>  Printed / Joined / Grouped / Suppressed (note I intentionally exclude
>  Materialized here sinc

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-05-29 Thread Bill Bejeck
Hi Florian,

Thanks for the KIP.  I think being able to add more context to the
processor names would be useful.

I like the idea of adding a "withProcessorName" to Produced, Consumed and
Joined.

But instead of adding the "Processed" parameter to a large percentage of
the methods, which would result in overloaded methods (which we removed
quite a bit with KIP-182) what do you think of adding a method
to the AbstractStream class "withName(String processorName)"? BTW I"m not
married to the method name, it's the best I can do off the top of my head.

For the methods that return void, we'd have to add a parameter, but that
would at least cut down on the number of overloaded methods in the API.

Just my 2 cents.

Thanks,
Bill

On Sun, May 27, 2018 at 4:13 PM, Florian Hussonnois 
wrote:

> Hi,
>
> I would like to start a new discussion on following KIP :
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
>
> This is still a draft.
>
> Looking forward for your feedback.
> --
> Florian HUSSONNOIS
>


Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-05-30 Thread Guozhang Wang
Hello Florian,

Thanks for the KIP. I have some meta feedbacks on the proposal:

1. You mentioned that this `Processed` object will be added to a new
overloaded variant of all the stateless operators, what about the stateful
operators? Would like to hear your opinions if you have thought about that:
note for stateful operators they will usually be mapped to multiple
processor node names, so we probably need to come up with some ways to
define all their names.

2. I share the same concern with Bill as for adding lots of new overload
functions into the stateless operators, as we have just spent quite some
effort in trimming them since 1.0.0 release. If the goal is to just provide
some "hints" on the generated processor node names, not strictly enforcing
the exact names that to be generated, then how about we just add a new
function to `KStream` and `KTable` classes like: "as(Processed)", with the
semantics as "the latest operators that generate this KStream / KTable will
be named accordingly to this hint".

The only caveat, is that for all operators like `KStream#to` and
`KStream#print` that returns void, this alternative would not work. But for
the current operators:

a. KStream#print,
b. KStream#foreach,
c. KStream#to,
d. KStream#process

I personally felt that except `KStream#process` users would not usually
bother to override their names, and for `KStream#process` we could add an
overload variant with the additional Processed object.


3. In your example, the processor names are still added with a suffix like "
-00", is this intentional? If yes, why (I thought with user
specified processor name hints we will not add suffix to distinguish
different nodes of the same type any more)?


Guozhang


On Tue, May 29, 2018 at 6:47 AM, Bill Bejeck  wrote:

> Hi Florian,
>
> Thanks for the KIP.  I think being able to add more context to the
> processor names would be useful.
>
> I like the idea of adding a "withProcessorName" to Produced, Consumed and
> Joined.
>
> But instead of adding the "Processed" parameter to a large percentage of
> the methods, which would result in overloaded methods (which we removed
> quite a bit with KIP-182) what do you think of adding a method
> to the AbstractStream class "withName(String processorName)"? BTW I"m not
> married to the method name, it's the best I can do off the top of my head.
>
> For the methods that return void, we'd have to add a parameter, but that
> would at least cut down on the number of overloaded methods in the API.
>
> Just my 2 cents.
>
> Thanks,
> Bill
>
> On Sun, May 27, 2018 at 4:13 PM, Florian Hussonnois  >
> wrote:
>
> > Hi,
> >
> > I would like to start a new discussion on following KIP :
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> >
> > This is still a draft.
> >
> > Looking forward for your feedback.
> > --
> > Florian HUSSONNOIS
> >
>



-- 
-- Guozhang


Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-05-31 Thread Damian Guy
Hi Florian,

Thanks for the KIP. What about KTable and other DSL interfaces? Will they
not want to be able to do the same thing?
It would be good to see a complete set of the public API changes.

Cheers,
Damian

On Wed, 30 May 2018 at 19:45 Guozhang Wang  wrote:

> Hello Florian,
>
> Thanks for the KIP. I have some meta feedbacks on the proposal:
>
> 1. You mentioned that this `Processed` object will be added to a new
> overloaded variant of all the stateless operators, what about the stateful
> operators? Would like to hear your opinions if you have thought about that:
> note for stateful operators they will usually be mapped to multiple
> processor node names, so we probably need to come up with some ways to
> define all their names.
>
> 2. I share the same concern with Bill as for adding lots of new overload
> functions into the stateless operators, as we have just spent quite some
> effort in trimming them since 1.0.0 release. If the goal is to just provide
> some "hints" on the generated processor node names, not strictly enforcing
> the exact names that to be generated, then how about we just add a new
> function to `KStream` and `KTable` classes like: "as(Processed)", with the
> semantics as "the latest operators that generate this KStream / KTable will
> be named accordingly to this hint".
>
> The only caveat, is that for all operators like `KStream#to` and
> `KStream#print` that returns void, this alternative would not work. But for
> the current operators:
>
> a. KStream#print,
> b. KStream#foreach,
> c. KStream#to,
> d. KStream#process
>
> I personally felt that except `KStream#process` users would not usually
> bother to override their names, and for `KStream#process` we could add an
> overload variant with the additional Processed object.
>
>
> 3. In your example, the processor names are still added with a suffix like
> "
> -00", is this intentional? If yes, why (I thought with user
> specified processor name hints we will not add suffix to distinguish
> different nodes of the same type any more)?
>
>
> Guozhang
>
>
> On Tue, May 29, 2018 at 6:47 AM, Bill Bejeck  wrote:
>
> > Hi Florian,
> >
> > Thanks for the KIP.  I think being able to add more context to the
> > processor names would be useful.
> >
> > I like the idea of adding a "withProcessorName" to Produced, Consumed and
> > Joined.
> >
> > But instead of adding the "Processed" parameter to a large percentage of
> > the methods, which would result in overloaded methods (which we removed
> > quite a bit with KIP-182) what do you think of adding a method
> > to the AbstractStream class "withName(String processorName)"? BTW I"m not
> > married to the method name, it's the best I can do off the top of my
> head.
> >
> > For the methods that return void, we'd have to add a parameter, but that
> > would at least cut down on the number of overloaded methods in the API.
> >
> > Just my 2 cents.
> >
> > Thanks,
> > Bill
> >
> > On Sun, May 27, 2018 at 4:13 PM, Florian Hussonnois <
> fhussonn...@gmail.com
> > >
> > wrote:
> >
> > > Hi,
> > >
> > > I would like to start a new discussion on following KIP :
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> > >
> > > This is still a draft.
> > >
> > > Looking forward for your feedback.
> > > --
> > > Florian HUSSONNOIS
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-05-31 Thread Florian Hussonnois
Hi ,
Thank you very much for your feedback.

1/
I agree that overloading most of the methods with a Processed is not ideal.
I've started modifying the KStream API and I got to the same conclusion.
Also ading a new method directly to KStreamImpl and KTableImpl classes
seems to be a better option.

However a processor name cannot be redefined after calling an operator (or
maybe I miss something in the code).
>From my understanding, this will only set the KStream name property not the
processor name previsouly added to the topology builder - leading to
InvalidTopology exception.

So the new method should actually defines the name of the next processor :
Below is an example :

*stream.as (Processed.name("MAPPE_TO_UPPERCASE")*
*  .map( (k, v) -> KeyValue.pair(k, v.toUpperCase()))*

I think this approach could solve the cases for methods returning void ?

Regarding this new method we have two possible implementations :

   1. Adding a method like : withName(String processorName)
   2. or adding a method accepting an Processed object : as(Processed).

I think solution 2. is preferable as the Processed class could be enriched
further (in futur).

2/
As Guozhang said some operators add internal processors.
For example the branch() method create one KStreamBranch processor to route
records and one KStreamPassThrough processor for each branch.
In that situation only the parent processor can be named. For children
processors we could keep the current behaviour that add a suffix (i.e
KSTREAM-BRANCHCHILD-)

This also the case for the join() method that result to adding multiple
processors to the topology (windowing, left/right joins and a merge
processor).
I think, like for the branch method users could only define a processor
name prefix.

3/
I think we should  still added a suffix like "-00" to processor
name and enforce uppercases as this will keep some consistency with the
ones generated by the API.

4/
Yes, the KTable interface should be modified like KStream to allow custom
processor names definition.

Thanks,


Le jeu. 31 mai 2018 à 19:18, Damian Guy  a écrit :

> Hi Florian,
>
> Thanks for the KIP. What about KTable and other DSL interfaces? Will they
> not want to be able to do the same thing?
> It would be good to see a complete set of the public API changes.
>
> Cheers,
> Damian
>
> On Wed, 30 May 2018 at 19:45 Guozhang Wang  wrote:
>
> > Hello Florian,
> >
> > Thanks for the KIP. I have some meta feedbacks on the proposal:
> >
> > 1. You mentioned that this `Processed` object will be added to a new
> > overloaded variant of all the stateless operators, what about the
> stateful
> > operators? Would like to hear your opinions if you have thought about
> that:
> > note for stateful operators they will usually be mapped to multiple
> > processor node names, so we probably need to come up with some ways to
> > define all their names.
> >
> > 2. I share the same concern with Bill as for adding lots of new overload
> > functions into the stateless operators, as we have just spent quite some
> > effort in trimming them since 1.0.0 release. If the goal is to just
> provide
> > some "hints" on the generated processor node names, not strictly
> enforcing
> > the exact names that to be generated, then how about we just add a new
> > function to `KStream` and `KTable` classes like: "as(Processed)", with
> the
> > semantics as "the latest operators that generate this KStream / KTable
> will
> > be named accordingly to this hint".
> >
> > The only caveat, is that for all operators like `KStream#to` and
> > `KStream#print` that returns void, this alternative would not work. But
> for
> > the current operators:
> >
> > a. KStream#print,
> > b. KStream#foreach,
> > c. KStream#to,
> > d. KStream#process
> >
> > I personally felt that except `KStream#process` users would not usually
> > bother to override their names, and for `KStream#process` we could add an
> > overload variant with the additional Processed object.
> >
> >
> > 3. In your example, the processor names are still added with a suffix
> like
> > "
> > -00", is this intentional? If yes, why (I thought with user
> > specified processor name hints we will not add suffix to distinguish
> > different nodes of the same type any more)?
> >
> >
> > Guozhang
> >
> >
> > On Tue, May 29, 2018 at 6:47 AM, Bill Bejeck  wrote:
> >
> > > Hi Florian,
> > >
> > > Thanks for the KIP.  I think being able to add more context to the
> > > processor names would be useful.
> > >
> > > I like the idea of adding a "withProcessorName" to Produced, Consumed
> and
> > > Joined.
> > >
> > > But instead of adding the "Processed" parameter to a large percentage
> of
> > > the methods, which would result in overloaded methods (which we removed
> > > quite a bit with KIP-182) what do you think of adding a method
> > > to the AbstractStream class "withName(String processorName)"? BTW I"m
> not
> > > married to the method name, it's the best I can do off th

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-05-31 Thread Guozhang Wang
Hi Florian,

Re 1: I think changing the KStreamImpl / KTableImpl to allow modifying the
processor name after the operator is fine as long as we do the check again
when modifying that. In fact, we are having some topology optimization
going on which may modify processor names in the final topology anyways (
https://github.com/apache/kafka/pull/4983). Semantically I think it is
easier to understand to developers than "deciding the processor name for
the next operator".

Re 2: Yeah I'm thinking that for operators that translates to multiple
processor names, we can still use the provided "hint" to name the processor
names, e.g. for Joins we can name them as `join-foo-this` and
`join-foo-that` etc if user calls `as("foo")`.

Re 3: The motivation I had about removing the suffix is that it has huge
restrictions on topology compatibilities: consider if user code added a new
operator, or library does some optimization to remove some operators, the
suffix indexing may be changed for a large amount of the processor names:
this will in turn change the internal state store names, as well as
internal topic names as well, making the new application topology to be
incompatible with the ones. One rationale I had about this KIP is that
aligned this effort, moving forward we can allow users to customize
internal names so that they can still be reused even with topology changes
(e.g. KIP-230), so I think removing the suffix index would be more
applicable in the long run.



Guozhang





On Thu, May 31, 2018 at 3:08 PM, Florian Hussonnois 
wrote:

> Hi ,
> Thank you very much for your feedback.
>
> 1/
> I agree that overloading most of the methods with a Processed is not ideal.
> I've started modifying the KStream API and I got to the same conclusion.
> Also ading a new method directly to KStreamImpl and KTableImpl classes
> seems to be a better option.
>
> However a processor name cannot be redefined after calling an operator (or
> maybe I miss something in the code).
> From my understanding, this will only set the KStream name property not the
> processor name previsouly added to the topology builder - leading to
> InvalidTopology exception.
>
> So the new method should actually defines the name of the next processor :
> Below is an example :
>
> *stream.as (Processed.name("MAPPE_TO_UPPERCASE")*
> *  .map( (k, v) -> KeyValue.pair(k, v.toUpperCase()))*
>
> I think this approach could solve the cases for methods returning void ?
>
> Regarding this new method we have two possible implementations :
>
>1. Adding a method like : withName(String processorName)
>2. or adding a method accepting an Processed object : as(Processed).
>
> I think solution 2. is preferable as the Processed class could be enriched
> further (in futur).
>
> 2/
> As Guozhang said some operators add internal processors.
> For example the branch() method create one KStreamBranch processor to route
> records and one KStreamPassThrough processor for each branch.
> In that situation only the parent processor can be named. For children
> processors we could keep the current behaviour that add a suffix (i.e
> KSTREAM-BRANCHCHILD-)
>
> This also the case for the join() method that result to adding multiple
> processors to the topology (windowing, left/right joins and a merge
> processor).
> I think, like for the branch method users could only define a processor
> name prefix.
>
> 3/
> I think we should  still added a suffix like "-00" to processor
> name and enforce uppercases as this will keep some consistency with the
> ones generated by the API.
>
> 4/
> Yes, the KTable interface should be modified like KStream to allow custom
> processor names definition.
>
> Thanks,
>
>
> Le jeu. 31 mai 2018 à 19:18, Damian Guy  a écrit :
>
> > Hi Florian,
> >
> > Thanks for the KIP. What about KTable and other DSL interfaces? Will they
> > not want to be able to do the same thing?
> > It would be good to see a complete set of the public API changes.
> >
> > Cheers,
> > Damian
> >
> > On Wed, 30 May 2018 at 19:45 Guozhang Wang  wrote:
> >
> > > Hello Florian,
> > >
> > > Thanks for the KIP. I have some meta feedbacks on the proposal:
> > >
> > > 1. You mentioned that this `Processed` object will be added to a new
> > > overloaded variant of all the stateless operators, what about the
> > stateful
> > > operators? Would like to hear your opinions if you have thought about
> > that:
> > > note for stateful operators they will usually be mapped to multiple
> > > processor node names, so we probably need to come up with some ways to
> > > define all their names.
> > >
> > > 2. I share the same concern with Bill as for adding lots of new
> overload
> > > functions into the stateless operators, as we have just spent quite
> some
> > > effort in trimming them since 1.0.0 release. If the goal is to just
> > provide
> > > some "hints" on the generated processor node names, not strictly
> > enforcing
> > > the exact names that to be generated, t

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-06-10 Thread Matthias J. Sax
Just catching up on this thread.

I like the general idea. Couple of comments:

 - I think that adding `Processed` (or maybe a different name?) is a
valid proposal for stateless operators that only have a single overload
atm. It would align with the overall API design.

 - for all methods with multiple existing overloads, we can consider to
extend `Consumed`, `Produced`, `Materialized` etc to take an additional
processor name (not sure atm how elegant this is; we would need to
"play" with the API a little bit; the advantage would be, that we do not
add more overloads what seems to be key for this KIP)

 - operators return void: while I agree that the "name first" chaining
idea is not very intuitive, it might still work, if we name the method
correctly (again, we would need to "play" with the API a little bit to see)

 - for DSL operators that are translated to multiple nodes: it might
make sense to use the specified operator name as prefix and add
reasonable suffixes. For example, a join translates into 5 operators
that could be name "name-left-store-processor",
"name-left-join-processor", "name-right-store-processor",
"name-right-join-processor", and "name-join-merge-processor" (or
similar). Maybe just using numbers might also work.

 - I think, we should strip the number suffixes if a user provides names

 - enforcing upper case seems to be tricky: for example, we do not
enforce upper case for store names and we cannot easily change it as it
would break compatibility -- thus, for consistency reasons we might not
want to do this

 - for better understand of the impact of the KIP, it would be quite
helpful if you would list all method names that are affected in the KIP
(ie, list all newly added overloads)


-Matthias


On 5/31/18 6:40 PM, Guozhang Wang wrote:
> Hi Florian,
> 
> Re 1: I think changing the KStreamImpl / KTableImpl to allow modifying the
> processor name after the operator is fine as long as we do the check again
> when modifying that. In fact, we are having some topology optimization
> going on which may modify processor names in the final topology anyways (
> https://github.com/apache/kafka/pull/4983). Semantically I think it is
> easier to understand to developers than "deciding the processor name for
> the next operator".
> 
> Re 2: Yeah I'm thinking that for operators that translates to multiple
> processor names, we can still use the provided "hint" to name the processor
> names, e.g. for Joins we can name them as `join-foo-this` and
> `join-foo-that` etc if user calls `as("foo")`.
> 
> Re 3: The motivation I had about removing the suffix is that it has huge
> restrictions on topology compatibilities: consider if user code added a new
> operator, or library does some optimization to remove some operators, the
> suffix indexing may be changed for a large amount of the processor names:
> this will in turn change the internal state store names, as well as
> internal topic names as well, making the new application topology to be
> incompatible with the ones. One rationale I had about this KIP is that
> aligned this effort, moving forward we can allow users to customize
> internal names so that they can still be reused even with topology changes
> (e.g. KIP-230), so I think removing the suffix index would be more
> applicable in the long run.
> 
> 
> 
> Guozhang
> 
> 
> 
> 
> 
> On Thu, May 31, 2018 at 3:08 PM, Florian Hussonnois 
> wrote:
> 
>> Hi ,
>> Thank you very much for your feedback.
>>
>> 1/
>> I agree that overloading most of the methods with a Processed is not ideal.
>> I've started modifying the KStream API and I got to the same conclusion.
>> Also ading a new method directly to KStreamImpl and KTableImpl classes
>> seems to be a better option.
>>
>> However a processor name cannot be redefined after calling an operator (or
>> maybe I miss something in the code).
>> From my understanding, this will only set the KStream name property not the
>> processor name previsouly added to the topology builder - leading to
>> InvalidTopology exception.
>>
>> So the new method should actually defines the name of the next processor :
>> Below is an example :
>>
>> *stream.as (Processed.name("MAPPE_TO_UPPERCASE")*
>> *  .map( (k, v) -> KeyValue.pair(k, v.toUpperCase()))*
>>
>> I think this approach could solve the cases for methods returning void ?
>>
>> Regarding this new method we have two possible implementations :
>>
>>1. Adding a method like : withName(String processorName)
>>2. or adding a method accepting an Processed object : as(Processed).
>>
>> I think solution 2. is preferable as the Processed class could be enriched
>> further (in futur).
>>
>> 2/
>> As Guozhang said some operators add internal processors.
>> For example the branch() method create one KStreamBranch processor to route
>> records and one KStreamPassThrough processor for each branch.
>> In that situation only the parent processor can be named. For children
>> processors we could keep t

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-07-05 Thread Florian Hussonnois
Hi, thank you very much for all you suggestions. I've started to update the
KIP (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
).
Also, I propose to rename the Processed class into Described - this will be
more meaningful (but this is just a detail).

I'm OK to not enforcing uppercase for specific names but should we allow
arbitrary names with whitespaces for example ? Currently, I can't tell if
this can lead to some side effects ?

Le lun. 11 juin 2018 à 01:31, Matthias J. Sax  a
écrit :

> Just catching up on this thread.
>
> I like the general idea. Couple of comments:
>
>  - I think that adding `Processed` (or maybe a different name?) is a
> valid proposal for stateless operators that only have a single overload
> atm. It would align with the overall API design.
>
>  - for all methods with multiple existing overloads, we can consider to
> extend `Consumed`, `Produced`, `Materialized` etc to take an additional
> processor name (not sure atm how elegant this is; we would need to
> "play" with the API a little bit; the advantage would be, that we do not
> add more overloads what seems to be key for this KIP)
>
>  - operators return void: while I agree that the "name first" chaining
> idea is not very intuitive, it might still work, if we name the method
> correctly (again, we would need to "play" with the API a little bit to see)
>
>  - for DSL operators that are translated to multiple nodes: it might
> make sense to use the specified operator name as prefix and add
> reasonable suffixes. For example, a join translates into 5 operators
> that could be name "name-left-store-processor",
> "name-left-join-processor", "name-right-store-processor",
> "name-right-join-processor", and "name-join-merge-processor" (or
> similar). Maybe just using numbers might also work.
>
>  - I think, we should strip the number suffixes if a user provides names
>
>  - enforcing upper case seems to be tricky: for example, we do not
> enforce upper case for store names and we cannot easily change it as it
> would break compatibility -- thus, for consistency reasons we might not
> want to do this
>
>  - for better understand of the impact of the KIP, it would be quite
> helpful if you would list all method names that are affected in the KIP
> (ie, list all newly added overloads)
>
>
> -Matthias
>
>
> On 5/31/18 6:40 PM, Guozhang Wang wrote:
> > Hi Florian,
> >
> > Re 1: I think changing the KStreamImpl / KTableImpl to allow modifying
> the
> > processor name after the operator is fine as long as we do the check
> again
> > when modifying that. In fact, we are having some topology optimization
> > going on which may modify processor names in the final topology anyways (
> > https://github.com/apache/kafka/pull/4983). Semantically I think it is
> > easier to understand to developers than "deciding the processor name for
> > the next operator".
> >
> > Re 2: Yeah I'm thinking that for operators that translates to multiple
> > processor names, we can still use the provided "hint" to name the
> processor
> > names, e.g. for Joins we can name them as `join-foo-this` and
> > `join-foo-that` etc if user calls `as("foo")`.
> >
> > Re 3: The motivation I had about removing the suffix is that it has huge
> > restrictions on topology compatibilities: consider if user code added a
> new
> > operator, or library does some optimization to remove some operators, the
> > suffix indexing may be changed for a large amount of the processor names:
> > this will in turn change the internal state store names, as well as
> > internal topic names as well, making the new application topology to be
> > incompatible with the ones. One rationale I had about this KIP is that
> > aligned this effort, moving forward we can allow users to customize
> > internal names so that they can still be reused even with topology
> changes
> > (e.g. KIP-230), so I think removing the suffix index would be more
> > applicable in the long run.
> >
> >
> >
> > Guozhang
> >
> >
> >
> >
> >
> > On Thu, May 31, 2018 at 3:08 PM, Florian Hussonnois <
> fhussonn...@gmail.com>
> > wrote:
> >
> >> Hi ,
> >> Thank you very much for your feedback.
> >>
> >> 1/
> >> I agree that overloading most of the methods with a Processed is not
> ideal.
> >> I've started modifying the KStream API and I got to the same conclusion.
> >> Also ading a new method directly to KStreamImpl and KTableImpl classes
> >> seems to be a better option.
> >>
> >> However a processor name cannot be redefined after calling an operator
> (or
> >> maybe I miss something in the code).
> >> From my understanding, this will only set the KStream name property not
> the
> >> processor name previsouly added to the topology builder - leading to
> >> InvalidTopology exception.
> >>
> >> So the new method should actually defines the name of the next
> processor :
> >> Below is an example :
> >>
> >> *stream.as (Processed.name("MAPPE_TO_UPPE

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-07-06 Thread John Roesler
Hi Florian,

Sorry I'm late to the party, but I missed the message originally.

Regarding the names, it's probably a good idea to stick to the same
character set we're currently using: letters, numbers, and hyphens. The
names are used in Kafka topics, files and folders, and RocksDB databases,
and we also need them to work with the file systems of Windows, Linux, and
MacOS. My opinion is that with a situation like that, it's better to be
conservative. It might also be a good idea to impose an upper limit on name
length to avoid running afoul of any of those systems.

---

It seems like there's a small debate between 1) adding a new method to
KStream (and maybe KTable) to modify its name after the fact, or 2)
piggy-backing on the config objects where they exist and adding one where
they don't. To me, #2 is the better alternative even though it produces
more overloads and may be a bit awkward in places.

The reason is simply that #1 is a high-level departure from the
graph-building paradigm we're using in the DSL. Consider:

Graph.node1(config).node2(config)

vs

Graph.node1().config().node2().config()

We could have done either, but we picked the former. I think it's probably
a good goal to try and stick to it so that developers can develop and rely
on their instincts for how the DSL will behave.

I do want to present one alternative to adding new config objects: we can
just add a "name()" method to all our "action" interfaces. For example,
I'll demonstrate how we can add a "name" to Predicate and then use it to
name a "KStream#filter" DSL operator:

public interface Predicate {
// existing method
boolean test(final K key, final V value);

// new default method adds the ability to name the predicate
default String name() {
return null;
}

// new static factory method adds the ability to wrap lambda predicates
with a named predicate
static  Predicate named(final String name, final
Predicate predicate) {
return new Predicate() {
@Override
public boolean test(final K key, final V value) {
return predicate.test(key, value);
}

@Override
public String name() {
return name;
}
};
}
}

Then, here's how it would look to use it:

// Anonymous predicates continue to work just fine
stream.filter((k, v) -> true);

// Devs can swap in a Predicate that implements the name() method.
stream.filter(new Predicate() {
@Override
public boolean test(final Object key, final Object value) {
return true;
}

@Override
public String name() {
return "hey";
}
});

// Or they can wrap their existing lambda using the static factory method
stream.filter(named("key", (k, v) -> true));

Just a thought.

Overall, I think it's really valuable to be able to name the processors,
for all the reasons you mentioned in the KIP. So thank you for introducing
this!

Thanks,
-John

On Thu, Jul 5, 2018 at 4:53 PM Florian Hussonnois 
wrote:

> Hi, thank you very much for all you suggestions. I've started to update the
> KIP (
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> ).
> Also, I propose to rename the Processed class into Described - this will be
> more meaningful (but this is just a detail).
>
> I'm OK to not enforcing uppercase for specific names but should we allow
> arbitrary names with whitespaces for example ? Currently, I can't tell if
> this can lead to some side effects ?
>
> Le lun. 11 juin 2018 à 01:31, Matthias J. Sax  a
> écrit :
>
> > Just catching up on this thread.
> >
> > I like the general idea. Couple of comments:
> >
> >  - I think that adding `Processed` (or maybe a different name?) is a
> > valid proposal for stateless operators that only have a single overload
> > atm. It would align with the overall API design.
> >
> >  - for all methods with multiple existing overloads, we can consider to
> > extend `Consumed`, `Produced`, `Materialized` etc to take an additional
> > processor name (not sure atm how elegant this is; we would need to
> > "play" with the API a little bit; the advantage would be, that we do not
> > add more overloads what seems to be key for this KIP)
> >
> >  - operators return void: while I agree that the "name first" chaining
> > idea is not very intuitive, it might still work, if we name the method
> > correctly (again, we would need to "play" with the API a little bit to
> see)
> >
> >  - for DSL operators that are translated to multiple nodes: it might
> > make sense to use the specified operator name as prefix and add
> > reasonable suffixes. For example, a join translates into 5 operators
> > that could be name "name-left-store-processor",
> > "name-left-join-processor", "name-right-store-processor",
> > "name-right-join-processor", and "name-join-merge-processor" (or
> > similar). Maybe just using numbers might also work.
> >
> >  

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-07-06 Thread Guozhang Wang
Hi folks, just to summarize the options we have so far:

1) Add a new "as" for KTable / KStream, plus adding new fields for
operators-returns-void control objects (the current wiki's proposal).

Pros: no more overloads.
Cons: a bit departing with the current high-level API design of the DSL,
plus, the inconsistency between operators-returns-void and
operators-not-return-voids.

2) Add overloaded functions for all operators, that accepts a new control
object "Described".

Pros: consistent with current APIs.
Cons: lots of overloaded functions to add.

3) Add another default function in the interface (thank you J8!) as John
proposed.

Pros: no overloaded functions, no "Described".
Cons: do we lose lambda functions really (seems not if we provide a "named"
for each func)? Plus "Described" may be more extensible than a single
`String`.


My principle of considering which one is better depends primarily on "how
to make advanced users easily use the additional API, while keeping it
hidden from normal users who do not care at all". For that purpose I think
3) > 1) > 2).

One caveat though, is that changing the interface would not be
binary-compatible though source-compatible, right? I.e. users need to
recompile their code though no changes needed.



Another note: for 3), if we really want to keep extensibility of Described
we could do sth. like:

-

public interface Predicate {
// existing method
boolean test(final K key, final V value);

// new default method adds the ability to name the predicate
default Described described() {
return new Described(null);
}
}

--

where user's code becomes:

stream.filter(named("key", (k, v) -> true));   // note `named` now just
sets a Described("key") in "described()".

stream.filter(described(Described.as("key", /* any other fancy parameters
in the future*/), (k, v) -> true));
--


I feel it is not much likely that we'd need to extend it further in the
future, so just a `String` would be good enough. But just listing all
possibilities here.



Guozhang






On Fri, Jul 6, 2018 at 8:19 AM, John Roesler  wrote:

> Hi Florian,
>
> Sorry I'm late to the party, but I missed the message originally.
>
> Regarding the names, it's probably a good idea to stick to the same
> character set we're currently using: letters, numbers, and hyphens. The
> names are used in Kafka topics, files and folders, and RocksDB databases,
> and we also need them to work with the file systems of Windows, Linux, and
> MacOS. My opinion is that with a situation like that, it's better to be
> conservative. It might also be a good idea to impose an upper limit on name
> length to avoid running afoul of any of those systems.
>
> ---
>
> It seems like there's a small debate between 1) adding a new method to
> KStream (and maybe KTable) to modify its name after the fact, or 2)
> piggy-backing on the config objects where they exist and adding one where
> they don't. To me, #2 is the better alternative even though it produces
> more overloads and may be a bit awkward in places.
>
> The reason is simply that #1 is a high-level departure from the
> graph-building paradigm we're using in the DSL. Consider:
>
> Graph.node1(config).node2(config)
>
> vs
>
> Graph.node1().config().node2().config()
>
> We could have done either, but we picked the former. I think it's probably
> a good goal to try and stick to it so that developers can develop and rely
> on their instincts for how the DSL will behave.
>
> I do want to present one alternative to adding new config objects: we can
> just add a "name()" method to all our "action" interfaces. For example,
> I'll demonstrate how we can add a "name" to Predicate and then use it to
> name a "KStream#filter" DSL operator:
>
> public interface Predicate {
> // existing method
> boolean test(final K key, final V value);
>
> // new default method adds the ability to name the predicate
> default String name() {
> return null;
> }
>
> // new static factory method adds the ability to wrap lambda predicates
> with a named predicate
> static  Predicate named(final String name, final
> Predicate predicate) {
> return new Predicate() {
> @Override
> public boolean test(final K key, final V value) {
> return predicate.test(key, value);
> }
>
> @Override
> public String name() {
> return name;
> }
> };
> }
> }
>
> Then, here's how it would look to use it:
>
> // Anonymous predicates continue to work just fine
> stream.filter((k, v) -> true);
>
> // Devs can swap in a Predicate that implements the name() method.
> stream.filter(new Predicate() {
> @Override
> public boolean test(final Object key, final Object value) {
> return true;
> }
>
> @Override
> public String name() {
> return "hey";
> }
> });
>
> // Or they can wrap their existing lambda 

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-07-06 Thread Florian Hussonnois
Hi,

The option #3 seems to be a good alternative and I find the API more
elegant (thanks John).

But, we still have the need to overload some methods either because they do
not accept an action instance or because they are translated to multiple
processors.

For example, this is the case for methods branch() and merge(). We could
introduce a new interface Named (or maybe a different name ?) with a method
name(). All action interfaces could extend this one to implement the option
3).
This would result by having the following overloads  :

Stream merge(final Named name, final KStream stream);
KStream[] branch(final Named name, final Predicate... predicates)

N.B : The list above is  not exhaustive

-
user's code will become :

KStream stream = builder.stream("test");
KStream[] branches =
stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"),
Predicate.named("STREAM-PAIR-VALUE", (k, v) -> v % 2 == 0),
Predicate.named("STREAM-IMPAIR-VALUE", (k, v) -> v % 2 !=
0));

branches[0].to("pair");
branches[1].to("impair");
-

This is a mix of the options 3) and 1)

Le ven. 6 juil. 2018 à 22:58, Guozhang Wang  a écrit :

> Hi folks, just to summarize the options we have so far:
>
> 1) Add a new "as" for KTable / KStream, plus adding new fields for
> operators-returns-void control objects (the current wiki's proposal).
>
> Pros: no more overloads.
> Cons: a bit departing with the current high-level API design of the DSL,
> plus, the inconsistency between operators-returns-void and
> operators-not-return-voids.
>
> 2) Add overloaded functions for all operators, that accepts a new control
> object "Described".
>
> Pros: consistent with current APIs.
> Cons: lots of overloaded functions to add.
>
> 3) Add another default function in the interface (thank you J8!) as John
> proposed.
>
> Pros: no overloaded functions, no "Described".
> Cons: do we lose lambda functions really (seems not if we provide a "named"
> for each func)? Plus "Described" may be more extensible than a single
> `String`.
>
>
> My principle of considering which one is better depends primarily on "how
> to make advanced users easily use the additional API, while keeping it
> hidden from normal users who do not care at all". For that purpose I think
> 3) > 1) > 2).
>
> One caveat though, is that changing the interface would not be
> binary-compatible though source-compatible, right? I.e. users need to
> recompile their code though no changes needed.
>
>
>
> Another note: for 3), if we really want to keep extensibility of Described
> we could do sth. like:
>
> -
>
> public interface Predicate {
> // existing method
> boolean test(final K key, final V value);
>
> // new default method adds the ability to name the predicate
> default Described described() {
> return new Described(null);
> }
> }
>
> --
>
> where user's code becomes:
>
> stream.filter(named("key", (k, v) -> true));   // note `named` now just
> sets a Described("key") in "described()".
>
> stream.filter(described(Described.as("key", /* any other fancy parameters
> in the future*/), (k, v) -> true));
> --
>
>
> I feel it is not much likely that we'd need to extend it further in the
> future, so just a `String` would be good enough. But just listing all
> possibilities here.
>
>
>
> Guozhang
>
>
>
>
>
>
> On Fri, Jul 6, 2018 at 8:19 AM, John Roesler  wrote:
>
> > Hi Florian,
> >
> > Sorry I'm late to the party, but I missed the message originally.
> >
> > Regarding the names, it's probably a good idea to stick to the same
> > character set we're currently using: letters, numbers, and hyphens. The
> > names are used in Kafka topics, files and folders, and RocksDB databases,
> > and we also need them to work with the file systems of Windows, Linux,
> and
> > MacOS. My opinion is that with a situation like that, it's better to be
> > conservative. It might also be a good idea to impose an upper limit on
> name
> > length to avoid running afoul of any of those systems.
> >
> > ---
> >
> > It seems like there's a small debate between 1) adding a new method to
> > KStream (and maybe KTable) to modify its name after the fact, or 2)
> > piggy-backing on the config objects where they exist and adding one where
> > they don't. To me, #2 is the better alternative even though it produces
> > more overloads and may be a bit awkward in places.
> >
> > The reason is simply that #1 is a high-level departure from the
> > graph-building paradigm we're using in the DSL. Consider:
> >
> > Graph.node1(config).node2(config)
> >
> > vs
> >
> > Graph.node1().config().node2().config()
> >
> > We could have done either, but we picked the former. I think it's
> probably
> > a good goal to try and stick to it so that developers can develop and
> rely
> > on their instincts for how the DSL will behave.
> >
> > I do want to present one alternative to adding new config objects: we can
> > just add a "

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2018-07-19 Thread Guozhang Wang
Hello Florian,

Sorry for being late... Found myself keep apologizing for late replies
these days. But I do want to push this KIP's progress forward as I see it
very important and helpful feature for extensibility.

About the exceptions, I've gone through them and hopefully it is an
exhaustive list:

1. KTable#toStream()
2. KStream#merge(KStream)
3. KStream#process() / transform() / transformValues()
4. KGroupedTable / KGroupedStream#count()


Here's my reasoning:

* It is okay not letting users to override the name for 1/2, since they are
too trivial to be useful for debugging, plus their processor names would
not determine any related topic / store names.
* For 3, I'd vote for adding overloaded functions with Named.
* For 4, if users really want to name the processor she can call
aggregate() instead, so I think it is okay to skip this case.


Guozhang



On Fri, Jul 6, 2018 at 3:06 PM, Florian Hussonnois 
wrote:

> Hi,
>
> The option #3 seems to be a good alternative and I find the API more
> elegant (thanks John).
>
> But, we still have the need to overload some methods either because they do
> not accept an action instance or because they are translated to multiple
> processors.
>
> For example, this is the case for methods branch() and merge(). We could
> introduce a new interface Named (or maybe a different name ?) with a method
> name(). All action interfaces could extend this one to implement the option
> 3).
> This would result by having the following overloads  :
>
> Stream merge(final Named name, final KStream stream);
> KStream[] branch(final Named name, final Predicate V>... predicates)
>
> N.B : The list above is  not exhaustive
>
> -
> user's code will become :
>
> KStream stream = builder.stream("test");
> KStream[] branches =
> stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"),
> Predicate.named("STREAM-PAIR-VALUE", (k, v) -> v % 2 ==
> 0),
> Predicate.named("STREAM-IMPAIR-VALUE", (k, v) -> v % 2 !=
> 0));
>
> branches[0].to("pair");
> branches[1].to("impair");
> -
>
> This is a mix of the options 3) and 1)
>
> Le ven. 6 juil. 2018 à 22:58, Guozhang Wang  a écrit :
>
> > Hi folks, just to summarize the options we have so far:
> >
> > 1) Add a new "as" for KTable / KStream, plus adding new fields for
> > operators-returns-void control objects (the current wiki's proposal).
> >
> > Pros: no more overloads.
> > Cons: a bit departing with the current high-level API design of the DSL,
> > plus, the inconsistency between operators-returns-void and
> > operators-not-return-voids.
> >
> > 2) Add overloaded functions for all operators, that accepts a new control
> > object "Described".
> >
> > Pros: consistent with current APIs.
> > Cons: lots of overloaded functions to add.
> >
> > 3) Add another default function in the interface (thank you J8!) as John
> > proposed.
> >
> > Pros: no overloaded functions, no "Described".
> > Cons: do we lose lambda functions really (seems not if we provide a
> "named"
> > for each func)? Plus "Described" may be more extensible than a single
> > `String`.
> >
> >
> > My principle of considering which one is better depends primarily on "how
> > to make advanced users easily use the additional API, while keeping it
> > hidden from normal users who do not care at all". For that purpose I
> think
> > 3) > 1) > 2).
> >
> > One caveat though, is that changing the interface would not be
> > binary-compatible though source-compatible, right? I.e. users need to
> > recompile their code though no changes needed.
> >
> >
> >
> > Another note: for 3), if we really want to keep extensibility of
> Described
> > we could do sth. like:
> >
> > -
> >
> > public interface Predicate {
> > // existing method
> > boolean test(final K key, final V value);
> >
> > // new default method adds the ability to name the predicate
> > default Described described() {
> > return new Described(null);
> > }
> > }
> >
> > --
> >
> > where user's code becomes:
> >
> > stream.filter(named("key", (k, v) -> true));   // note `named` now just
> > sets a Described("key") in "described()".
> >
> > stream.filter(described(Described.as("key", /* any other fancy
> parameters
> > in the future*/), (k, v) -> true));
> > --
> >
> >
> > I feel it is not much likely that we'd need to extend it further in the
> > future, so just a `String` would be good enough. But just listing all
> > possibilities here.
> >
> >
> >
> > Guozhang
> >
> >
> >
> >
> >
> >
> > On Fri, Jul 6, 2018 at 8:19 AM, John Roesler  wrote:
> >
> > > Hi Florian,
> > >
> > > Sorry I'm late to the party, but I missed the message originally.
> > >
> > > Regarding the names, it's probably a good idea to stick to the same
> > > character set we're currently using: letters, numbers, and hyphens. The
> > > names are used in Kafka topics, files and folders, and RocksDB
> databases,
> > > and we also need them to work with t

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-02-25 Thread Florian Hussonnois
Hi Matthias, Bill,

I've updated the KIP with your last feedbacks. However, you have suggested
to rename : `Suppressed#withName(String)`
withName is not a static method like Joined.named was. withName method is
part of the new interface NameOperation.

In addition, I've split the PR in 5 commits so that it will be much easier
to review.

Thanks

Le jeu. 21 févr. 2019 à 23:54, Bill Bejeck  a écrit :

> Hi Florian,
>
> Overall the KIP LGTM.  Once you've addressed the final comments from
> Matthias I think we can put this up for a vote.
>
> Thanks,
> Bill
>
> On Wed, Feb 20, 2019 at 9:42 PM Matthias J. Sax 
> wrote:
>
> > Florian,
> >
> > thanks for updating the KIP (and no worries for late reply -- 2.2
> > release kept us busy anyway...). Overall LGTM.
> >
> > Just some nits:
> >
> >
> > KStream-Table:
> >
> > Do we need to list the existing stream-globalTable join methods in the
> > first table (thought it should only contain new/changing methods).
> >
> > typo: `join(GlobalKTbale, KeyValueMapper, ValueJoiner, Named)`
> >
> > `leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)` is missing the new
> > `Named` parameter.
> >
> > `static Joined#named(final String name)`
> >  -> should be `#as(...)` instead of `named(...)`
> >
> > flatTransform() is missing (cf. KIP-313)
> >
> >
> >
> > KTable-table:
> >
> > `Suppressed#withName(String)`
> >  -> should we change this to `#as(...)` too (similar to `named()`)
> >
> >
> >
> > -Matthias
> >
> >
> >
> > On 1/25/19 9:49 AM, Matthias J. Sax wrote:
> > > I was reading the KIP again, and there are still some open question and
> > > inconsistencies:
> > >
> > > For example for `KGroupedStream#count(Named)` the KIP says, that only
> > > the processor will be named, while the state store name will be `PREFIX
> > > + COUNT` (ie, an auto-generated name). Additionally, for
> > > `KGroupedStream#count(Named, Materialized)` the processor will be named
> > > according to `Named` and the store will be named according to
> > > `Materialized.as()`. So far so good. It implies that naming the
> > > processor and naming the store are independent. (This pattern is
> applied
> > > to all aggregation functions, for KStream and KTable).
> > >
> > > However, for `KTable#filter(Predicate, Named)` the KIP says, the
> > > processor name and the store name are set. This sound wrong (ie,
> > > inconsistent with the first paragraph from above), because there is
> also
> > > `KTable#filter(Predicate, Named, Materialized)`. Also note, for the
> > > first operator, the store might not be materialized to at all. (This
> > > issue is there for all KTable operators -- stateless and stateful).
> > >
> > > Finally, there is the following statement in the KIP:
> > >
> > >> Also, note that for all methods accepting a Materialized argument, if
> > no state store named is provided then the node named will be used to
> > generate a one. The state store name will be the node name suffixed with
> > "-table".
> > >
> > >
> > > This contradict the non-naming of stores from the very beginning.
> > >
> > >
> > > Also, the KIP still contains the question about `join(GlobalKTable,
> > > KeyValueMapper, ValueJoiner)` and `leftJoin(GlobalKTable,
> > > KeyValueMapper, ValueJoiner)`. I think a consistent approach would be
> to
> > > add one overload each that takes a `Named` parameter.
> > >
> > >
> > > Thoughts?
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 1/17/19 2:56 PM, Bill Bejeck wrote:
> > >> +1 for me on Guozhang's proposal for changes to Joined.
> > >>
> > >> Thanks,
> > >> Bill
> > >>
> > >> On Thu, Jan 17, 2019 at 5:55 PM Matthias J. Sax <
> matth...@confluent.io>
> > >> wrote:
> > >>
> > >>> Thanks for all the follow up comments!
> > >>>
> > >>> As I mentioned earlier, I am ok with adding overloads instead of
> using
> > >>> Materialized to specify the processor name. Seems this is what the
> > >>> majority of people prefers.
> > >>>
> > >>> I am also +1 on Guozhang's suggestion to deprecate `static
> > >>> Joined#named()` and replace it with `static Joined#as` for
> consistency
> > >>> and to deprecate getter `Joined#name()` for removal and introduce
> > >>> `JoinedInternal` to access the name.
> > >>>
> > >>> @Guozhang: the vote is already up :)
> > >>>
> > >>>
> > >>> -Matthias
> > >>>
> > >>> On 1/17/19 2:45 PM, Guozhang Wang wrote:
> >  Wow that's a lot of discussions in 6 days! :) Just catching up and
> > >>> sharing
> >  my two cents here:
> > 
> >  1. Materialized: I'm inclined to not let Materialized extending
> Named
> > and
> >  add the overload as well. All the rationales have been very well
> > >>> summarized
> >  before. Just to emphasize on John's points: Materialized is
> > considered as
> >  the control object being leveraged by the optimization framework to
> >  determine if the state store should be physically materialized or
> > not. So
> >  let's say if the user does not want to query the store (hence it can
> > just
> >  be locally materializ

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-02-26 Thread Bill Bejeck
Hi Florian,

Thanks for the update to the KIP.

As for changing the name for "Suppressed#withName", I believe we can update
the table in  KIP to say "Suppressed#as" as the KIP states that:

>> In addition, we propose to add a new static method with the following
signature to each of those class *as(final String processorName).*

where Suppressed is one of the classes listed.

So once we make that minor update to the KIP, we can start the vote.

Thanks!
Bill



On Mon, Feb 25, 2019 at 5:24 AM Florian Hussonnois 
wrote:

> Hi Matthias, Bill,
>
> I've updated the KIP with your last feedbacks. However, you have suggested
> to rename : `Suppressed#withName(String)`
> withName is not a static method like Joined.named was. withName method is
> part of the new interface NameOperation.
>
> In addition, I've split the PR in 5 commits so that it will be much easier
> to review.
>
> Thanks
>
> Le jeu. 21 févr. 2019 à 23:54, Bill Bejeck  a écrit :
>
> > Hi Florian,
> >
> > Overall the KIP LGTM.  Once you've addressed the final comments from
> > Matthias I think we can put this up for a vote.
> >
> > Thanks,
> > Bill
> >
> > On Wed, Feb 20, 2019 at 9:42 PM Matthias J. Sax 
> > wrote:
> >
> > > Florian,
> > >
> > > thanks for updating the KIP (and no worries for late reply -- 2.2
> > > release kept us busy anyway...). Overall LGTM.
> > >
> > > Just some nits:
> > >
> > >
> > > KStream-Table:
> > >
> > > Do we need to list the existing stream-globalTable join methods in the
> > > first table (thought it should only contain new/changing methods).
> > >
> > > typo: `join(GlobalKTbale, KeyValueMapper, ValueJoiner, Named)`
> > >
> > > `leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)` is missing the
> new
> > > `Named` parameter.
> > >
> > > `static Joined#named(final String name)`
> > >  -> should be `#as(...)` instead of `named(...)`
> > >
> > > flatTransform() is missing (cf. KIP-313)
> > >
> > >
> > >
> > > KTable-table:
> > >
> > > `Suppressed#withName(String)`
> > >  -> should we change this to `#as(...)` too (similar to `named()`)
> > >
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> > > On 1/25/19 9:49 AM, Matthias J. Sax wrote:
> > > > I was reading the KIP again, and there are still some open question
> and
> > > > inconsistencies:
> > > >
> > > > For example for `KGroupedStream#count(Named)` the KIP says, that only
> > > > the processor will be named, while the state store name will be
> `PREFIX
> > > > + COUNT` (ie, an auto-generated name). Additionally, for
> > > > `KGroupedStream#count(Named, Materialized)` the processor will be
> named
> > > > according to `Named` and the store will be named according to
> > > > `Materialized.as()`. So far so good. It implies that naming the
> > > > processor and naming the store are independent. (This pattern is
> > applied
> > > > to all aggregation functions, for KStream and KTable).
> > > >
> > > > However, for `KTable#filter(Predicate, Named)` the KIP says, the
> > > > processor name and the store name are set. This sound wrong (ie,
> > > > inconsistent with the first paragraph from above), because there is
> > also
> > > > `KTable#filter(Predicate, Named, Materialized)`. Also note, for the
> > > > first operator, the store might not be materialized to at all. (This
> > > > issue is there for all KTable operators -- stateless and stateful).
> > > >
> > > > Finally, there is the following statement in the KIP:
> > > >
> > > >> Also, note that for all methods accepting a Materialized argument,
> if
> > > no state store named is provided then the node named will be used to
> > > generate a one. The state store name will be the node name suffixed
> with
> > > "-table".
> > > >
> > > >
> > > > This contradict the non-naming of stores from the very beginning.
> > > >
> > > >
> > > > Also, the KIP still contains the question about `join(GlobalKTable,
> > > > KeyValueMapper, ValueJoiner)` and `leftJoin(GlobalKTable,
> > > > KeyValueMapper, ValueJoiner)`. I think a consistent approach would be
> > to
> > > > add one overload each that takes a `Named` parameter.
> > > >
> > > >
> > > > Thoughts?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 1/17/19 2:56 PM, Bill Bejeck wrote:
> > > >> +1 for me on Guozhang's proposal for changes to Joined.
> > > >>
> > > >> Thanks,
> > > >> Bill
> > > >>
> > > >> On Thu, Jan 17, 2019 at 5:55 PM Matthias J. Sax <
> > matth...@confluent.io>
> > > >> wrote:
> > > >>
> > > >>> Thanks for all the follow up comments!
> > > >>>
> > > >>> As I mentioned earlier, I am ok with adding overloads instead of
> > using
> > > >>> Materialized to specify the processor name. Seems this is what the
> > > >>> majority of people prefers.
> > > >>>
> > > >>> I am also +1 on Guozhang's suggestion to deprecate `static
> > > >>> Joined#named()` and replace it with `static Joined#as` for
> > consistency
> > > >>> and to deprecate getter `Joined#name()` for removal and introduce
> > > >>> `JoinedInternal` to access the name.
> > > >>>
> > > >>> @Guozhang: th

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-02-26 Thread John Roesler
Thanks for the feedback, Bill.

It might be a good idea to hold of on adding the static method to
Suppressed for now...

Unlike the other operator/config-class pairs, `suppress` has no "default"
mode. That is, there is no `KTable.suppress()` method with no arguments.
So, when using it, you must either pick `untilWindowCloses` or
`untilTimeLimit`, and you can also only pick one of those.

The current `withName` method is an optional way to add a name to the
Suppressed config.

Adding `Suppressed.as(name)` as another static factory method seems to have
some real downsides at the moment:
* `suppress(Suppressed.as(name))` is an invalid config, so we can either go
to extra lengths with an intermediate builder class just to store the name,
or lose the compile-time guarantee that you actually pick one of the
suppression types, and instead just check at run time.
* The two config choices are currently static factory methods, so adding
`as(name)` obligates us to also add chained versions of `untilWindowCloses`
and `untilTimeLimit`. This opens up a new can of worms to name the chained
methods, and it also creates more ambiguity in the API, since there are
then multiple ways to say the same thing.

Maybe in the future, if there's some automatic default configuration for
suppression, then we can add `as(name)`, but from where I'm sitting right
now, it seems to have no real upside and a few downsides.

So, to summarize, I would propose to basically leave `Suppressed`'s
interface alone and just extend the new `NamedOperation`.

WDYT?

Thanks,
-John

On Tue, Feb 26, 2019 at 9:18 AM Bill Bejeck  wrote:

> Hi Florian,
>
> Thanks for the update to the KIP.
>
> As for changing the name for "Suppressed#withName", I believe we can update
> the table in  KIP to say "Suppressed#as" as the KIP states that:
>
> >> In addition, we propose to add a new static method with the following
> signature to each of those class *as(final String processorName).*
>
> where Suppressed is one of the classes listed.
>
> So once we make that minor update to the KIP, we can start the vote.
>
> Thanks!
> Bill
>
>
>
> On Mon, Feb 25, 2019 at 5:24 AM Florian Hussonnois 
> wrote:
>
> > Hi Matthias, Bill,
> >
> > I've updated the KIP with your last feedbacks. However, you have
> suggested
> > to rename : `Suppressed#withName(String)`
> > withName is not a static method like Joined.named was. withName method is
> > part of the new interface NameOperation.
> >
> > In addition, I've split the PR in 5 commits so that it will be much
> easier
> > to review.
> >
> > Thanks
> >
> > Le jeu. 21 févr. 2019 à 23:54, Bill Bejeck  a écrit :
> >
> > > Hi Florian,
> > >
> > > Overall the KIP LGTM.  Once you've addressed the final comments from
> > > Matthias I think we can put this up for a vote.
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Wed, Feb 20, 2019 at 9:42 PM Matthias J. Sax  >
> > > wrote:
> > >
> > > > Florian,
> > > >
> > > > thanks for updating the KIP (and no worries for late reply -- 2.2
> > > > release kept us busy anyway...). Overall LGTM.
> > > >
> > > > Just some nits:
> > > >
> > > >
> > > > KStream-Table:
> > > >
> > > > Do we need to list the existing stream-globalTable join methods in
> the
> > > > first table (thought it should only contain new/changing methods).
> > > >
> > > > typo: `join(GlobalKTbale, KeyValueMapper, ValueJoiner, Named)`
> > > >
> > > > `leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)` is missing the
> > new
> > > > `Named` parameter.
> > > >
> > > > `static Joined#named(final String name)`
> > > >  -> should be `#as(...)` instead of `named(...)`
> > > >
> > > > flatTransform() is missing (cf. KIP-313)
> > > >
> > > >
> > > >
> > > > KTable-table:
> > > >
> > > > `Suppressed#withName(String)`
> > > >  -> should we change this to `#as(...)` too (similar to `named()`)
> > > >
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > >
> > > > On 1/25/19 9:49 AM, Matthias J. Sax wrote:
> > > > > I was reading the KIP again, and there are still some open question
> > and
> > > > > inconsistencies:
> > > > >
> > > > > For example for `KGroupedStream#count(Named)` the KIP says, that
> only
> > > > > the processor will be named, while the state store name will be
> > `PREFIX
> > > > > + COUNT` (ie, an auto-generated name). Additionally, for
> > > > > `KGroupedStream#count(Named, Materialized)` the processor will be
> > named
> > > > > according to `Named` and the store will be named according to
> > > > > `Materialized.as()`. So far so good. It implies that naming the
> > > > > processor and naming the store are independent. (This pattern is
> > > applied
> > > > > to all aggregation functions, for KStream and KTable).
> > > > >
> > > > > However, for `KTable#filter(Predicate, Named)` the KIP says, the
> > > > > processor name and the store name are set. This sound wrong (ie,
> > > > > inconsistent with the first paragraph from above), because there is
> > > also
> > > > > `KTable#filter(Predicate, Named, Materialized)`. Also n

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-02-26 Thread Guozhang Wang
Bill, John, thanks for your comments.

I agree with John that we can leave Suppressed to not have a static method
`as` for now since it is not useful at the moment. Assuming that is agreed
on, I think we can move on to the voting process.


Guozhang

On Tue, Feb 26, 2019 at 2:56 PM John Roesler  wrote:

> Thanks for the feedback, Bill.
>
> It might be a good idea to hold of on adding the static method to
> Suppressed for now...
>
> Unlike the other operator/config-class pairs, `suppress` has no "default"
> mode. That is, there is no `KTable.suppress()` method with no arguments.
> So, when using it, you must either pick `untilWindowCloses` or
> `untilTimeLimit`, and you can also only pick one of those.
>
> The current `withName` method is an optional way to add a name to the
> Suppressed config.
>
> Adding `Suppressed.as(name)` as another static factory method seems to have
> some real downsides at the moment:
> * `suppress(Suppressed.as(name))` is an invalid config, so we can either go
> to extra lengths with an intermediate builder class just to store the name,
> or lose the compile-time guarantee that you actually pick one of the
> suppression types, and instead just check at run time.
> * The two config choices are currently static factory methods, so adding
> `as(name)` obligates us to also add chained versions of `untilWindowCloses`
> and `untilTimeLimit`. This opens up a new can of worms to name the chained
> methods, and it also creates more ambiguity in the API, since there are
> then multiple ways to say the same thing.
>
> Maybe in the future, if there's some automatic default configuration for
> suppression, then we can add `as(name)`, but from where I'm sitting right
> now, it seems to have no real upside and a few downsides.
>
> So, to summarize, I would propose to basically leave `Suppressed`'s
> interface alone and just extend the new `NamedOperation`.
>
> WDYT?
>
> Thanks,
> -John
>
> On Tue, Feb 26, 2019 at 9:18 AM Bill Bejeck  wrote:
>
> > Hi Florian,
> >
> > Thanks for the update to the KIP.
> >
> > As for changing the name for "Suppressed#withName", I believe we can
> update
> > the table in  KIP to say "Suppressed#as" as the KIP states that:
> >
> > >> In addition, we propose to add a new static method with the following
> > signature to each of those class *as(final String processorName).*
> >
> > where Suppressed is one of the classes listed.
> >
> > So once we make that minor update to the KIP, we can start the vote.
> >
> > Thanks!
> > Bill
> >
> >
> >
> > On Mon, Feb 25, 2019 at 5:24 AM Florian Hussonnois <
> fhussonn...@gmail.com>
> > wrote:
> >
> > > Hi Matthias, Bill,
> > >
> > > I've updated the KIP with your last feedbacks. However, you have
> > suggested
> > > to rename : `Suppressed#withName(String)`
> > > withName is not a static method like Joined.named was. withName method
> is
> > > part of the new interface NameOperation.
> > >
> > > In addition, I've split the PR in 5 commits so that it will be much
> > easier
> > > to review.
> > >
> > > Thanks
> > >
> > > Le jeu. 21 févr. 2019 à 23:54, Bill Bejeck  a
> écrit :
> > >
> > > > Hi Florian,
> > > >
> > > > Overall the KIP LGTM.  Once you've addressed the final comments from
> > > > Matthias I think we can put this up for a vote.
> > > >
> > > > Thanks,
> > > > Bill
> > > >
> > > > On Wed, Feb 20, 2019 at 9:42 PM Matthias J. Sax <
> matth...@confluent.io
> > >
> > > > wrote:
> > > >
> > > > > Florian,
> > > > >
> > > > > thanks for updating the KIP (and no worries for late reply -- 2.2
> > > > > release kept us busy anyway...). Overall LGTM.
> > > > >
> > > > > Just some nits:
> > > > >
> > > > >
> > > > > KStream-Table:
> > > > >
> > > > > Do we need to list the existing stream-globalTable join methods in
> > the
> > > > > first table (thought it should only contain new/changing methods).
> > > > >
> > > > > typo: `join(GlobalKTbale, KeyValueMapper, ValueJoiner, Named)`
> > > > >
> > > > > `leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)` is missing
> the
> > > new
> > > > > `Named` parameter.
> > > > >
> > > > > `static Joined#named(final String name)`
> > > > >  -> should be `#as(...)` instead of `named(...)`
> > > > >
> > > > > flatTransform() is missing (cf. KIP-313)
> > > > >
> > > > >
> > > > >
> > > > > KTable-table:
> > > > >
> > > > > `Suppressed#withName(String)`
> > > > >  -> should we change this to `#as(...)` too (similar to `named()`)
> > > > >
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > >
> > > > > On 1/25/19 9:49 AM, Matthias J. Sax wrote:
> > > > > > I was reading the KIP again, and there are still some open
> question
> > > and
> > > > > > inconsistencies:
> > > > > >
> > > > > > For example for `KGroupedStream#count(Named)` the KIP says, that
> > only
> > > > > > the processor will be named, while the state store name will be
> > > `PREFIX
> > > > > > + COUNT` (ie, an auto-generated name). Additionally, for
> > > > > > `KGroupedStream#count(Named, Materialized)` the processo

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-02-26 Thread Matthias J. Sax
Florian,

Thanks for updating the KIP.

I missed the missing `static` on `Suppressed#withName()` and I agree
with John and Guozhang.

Don't have any further comments. And thanks for splitting the PR!


@Guozhang: there is already a VOTE thread.



-Matthias

On 2/26/19 3:26 PM, Guozhang Wang wrote:
> Bill, John, thanks for your comments.
> 
> I agree with John that we can leave Suppressed to not have a static method
> `as` for now since it is not useful at the moment. Assuming that is agreed
> on, I think we can move on to the voting process.
> 
> 
> Guozhang
> 
> On Tue, Feb 26, 2019 at 2:56 PM John Roesler  wrote:
> 
>> Thanks for the feedback, Bill.
>>
>> It might be a good idea to hold of on adding the static method to
>> Suppressed for now...
>>
>> Unlike the other operator/config-class pairs, `suppress` has no "default"
>> mode. That is, there is no `KTable.suppress()` method with no arguments.
>> So, when using it, you must either pick `untilWindowCloses` or
>> `untilTimeLimit`, and you can also only pick one of those.
>>
>> The current `withName` method is an optional way to add a name to the
>> Suppressed config.
>>
>> Adding `Suppressed.as(name)` as another static factory method seems to have
>> some real downsides at the moment:
>> * `suppress(Suppressed.as(name))` is an invalid config, so we can either go
>> to extra lengths with an intermediate builder class just to store the name,
>> or lose the compile-time guarantee that you actually pick one of the
>> suppression types, and instead just check at run time.
>> * The two config choices are currently static factory methods, so adding
>> `as(name)` obligates us to also add chained versions of `untilWindowCloses`
>> and `untilTimeLimit`. This opens up a new can of worms to name the chained
>> methods, and it also creates more ambiguity in the API, since there are
>> then multiple ways to say the same thing.
>>
>> Maybe in the future, if there's some automatic default configuration for
>> suppression, then we can add `as(name)`, but from where I'm sitting right
>> now, it seems to have no real upside and a few downsides.
>>
>> So, to summarize, I would propose to basically leave `Suppressed`'s
>> interface alone and just extend the new `NamedOperation`.
>>
>> WDYT?
>>
>> Thanks,
>> -John
>>
>> On Tue, Feb 26, 2019 at 9:18 AM Bill Bejeck  wrote:
>>
>>> Hi Florian,
>>>
>>> Thanks for the update to the KIP.
>>>
>>> As for changing the name for "Suppressed#withName", I believe we can
>> update
>>> the table in  KIP to say "Suppressed#as" as the KIP states that:
>>>
> In addition, we propose to add a new static method with the following
>>> signature to each of those class *as(final String processorName).*
>>>
>>> where Suppressed is one of the classes listed.
>>>
>>> So once we make that minor update to the KIP, we can start the vote.
>>>
>>> Thanks!
>>> Bill
>>>
>>>
>>>
>>> On Mon, Feb 25, 2019 at 5:24 AM Florian Hussonnois <
>> fhussonn...@gmail.com>
>>> wrote:
>>>
 Hi Matthias, Bill,

 I've updated the KIP with your last feedbacks. However, you have
>>> suggested
 to rename : `Suppressed#withName(String)`
 withName is not a static method like Joined.named was. withName method
>> is
 part of the new interface NameOperation.

 In addition, I've split the PR in 5 commits so that it will be much
>>> easier
 to review.

 Thanks

 Le jeu. 21 févr. 2019 à 23:54, Bill Bejeck  a
>> écrit :

> Hi Florian,
>
> Overall the KIP LGTM.  Once you've addressed the final comments from
> Matthias I think we can put this up for a vote.
>
> Thanks,
> Bill
>
> On Wed, Feb 20, 2019 at 9:42 PM Matthias J. Sax <
>> matth...@confluent.io

> wrote:
>
>> Florian,
>>
>> thanks for updating the KIP (and no worries for late reply -- 2.2
>> release kept us busy anyway...). Overall LGTM.
>>
>> Just some nits:
>>
>>
>> KStream-Table:
>>
>> Do we need to list the existing stream-globalTable join methods in
>>> the
>> first table (thought it should only contain new/changing methods).
>>
>> typo: `join(GlobalKTbale, KeyValueMapper, ValueJoiner, Named)`
>>
>> `leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)` is missing
>> the
 new
>> `Named` parameter.
>>
>> `static Joined#named(final String name)`
>>  -> should be `#as(...)` instead of `named(...)`
>>
>> flatTransform() is missing (cf. KIP-313)
>>
>>
>>
>> KTable-table:
>>
>> `Suppressed#withName(String)`
>>  -> should we change this to `#as(...)` too (similar to `named()`)
>>
>>
>>
>> -Matthias
>>
>>
>>
>> On 1/25/19 9:49 AM, Matthias J. Sax wrote:
>>> I was reading the KIP again, and there are still some open
>> question
 and
>>> inconsistencies:
>>>
>>> For example for `KGroupedStream#count(Named)` the KIP says, that
>>> only
>>> the processor will

Re: [DISCUSS] KIP-307: Allow to define custom processor names with KStreams DSL

2019-02-26 Thread Bill Bejeck
After reading John's latest response, I agree as well.

+1(binding)

-Bill



On Tue, Feb 26, 2019 at 9:09 PM Matthias J. Sax 
wrote:

> Florian,
>
> Thanks for updating the KIP.
>
> I missed the missing `static` on `Suppressed#withName()` and I agree
> with John and Guozhang.
>
> Don't have any further comments. And thanks for splitting the PR!
>
>
> @Guozhang: there is already a VOTE thread.
>
>
>
> -Matthias
>
> On 2/26/19 3:26 PM, Guozhang Wang wrote:
> > Bill, John, thanks for your comments.
> >
> > I agree with John that we can leave Suppressed to not have a static
> method
> > `as` for now since it is not useful at the moment. Assuming that is
> agreed
> > on, I think we can move on to the voting process.
> >
> >
> > Guozhang
> >
> > On Tue, Feb 26, 2019 at 2:56 PM John Roesler  wrote:
> >
> >> Thanks for the feedback, Bill.
> >>
> >> It might be a good idea to hold of on adding the static method to
> >> Suppressed for now...
> >>
> >> Unlike the other operator/config-class pairs, `suppress` has no
> "default"
> >> mode. That is, there is no `KTable.suppress()` method with no arguments.
> >> So, when using it, you must either pick `untilWindowCloses` or
> >> `untilTimeLimit`, and you can also only pick one of those.
> >>
> >> The current `withName` method is an optional way to add a name to the
> >> Suppressed config.
> >>
> >> Adding `Suppressed.as(name)` as another static factory method seems to
> have
> >> some real downsides at the moment:
> >> * `suppress(Suppressed.as(name))` is an invalid config, so we can
> either go
> >> to extra lengths with an intermediate builder class just to store the
> name,
> >> or lose the compile-time guarantee that you actually pick one of the
> >> suppression types, and instead just check at run time.
> >> * The two config choices are currently static factory methods, so adding
> >> `as(name)` obligates us to also add chained versions of
> `untilWindowCloses`
> >> and `untilTimeLimit`. This opens up a new can of worms to name the
> chained
> >> methods, and it also creates more ambiguity in the API, since there are
> >> then multiple ways to say the same thing.
> >>
> >> Maybe in the future, if there's some automatic default configuration for
> >> suppression, then we can add `as(name)`, but from where I'm sitting
> right
> >> now, it seems to have no real upside and a few downsides.
> >>
> >> So, to summarize, I would propose to basically leave `Suppressed`'s
> >> interface alone and just extend the new `NamedOperation`.
> >>
> >> WDYT?
> >>
> >> Thanks,
> >> -John
> >>
> >> On Tue, Feb 26, 2019 at 9:18 AM Bill Bejeck  wrote:
> >>
> >>> Hi Florian,
> >>>
> >>> Thanks for the update to the KIP.
> >>>
> >>> As for changing the name for "Suppressed#withName", I believe we can
> >> update
> >>> the table in  KIP to say "Suppressed#as" as the KIP states that:
> >>>
> > In addition, we propose to add a new static method with the following
> >>> signature to each of those class *as(final String processorName).*
> >>>
> >>> where Suppressed is one of the classes listed.
> >>>
> >>> So once we make that minor update to the KIP, we can start the vote.
> >>>
> >>> Thanks!
> >>> Bill
> >>>
> >>>
> >>>
> >>> On Mon, Feb 25, 2019 at 5:24 AM Florian Hussonnois <
> >> fhussonn...@gmail.com>
> >>> wrote:
> >>>
>  Hi Matthias, Bill,
> 
>  I've updated the KIP with your last feedbacks. However, you have
> >>> suggested
>  to rename : `Suppressed#withName(String)`
>  withName is not a static method like Joined.named was. withName method
> >> is
>  part of the new interface NameOperation.
> 
>  In addition, I've split the PR in 5 commits so that it will be much
> >>> easier
>  to review.
> 
>  Thanks
> 
>  Le jeu. 21 févr. 2019 à 23:54, Bill Bejeck  a
> >> écrit :
> 
> > Hi Florian,
> >
> > Overall the KIP LGTM.  Once you've addressed the final comments from
> > Matthias I think we can put this up for a vote.
> >
> > Thanks,
> > Bill
> >
> > On Wed, Feb 20, 2019 at 9:42 PM Matthias J. Sax <
> >> matth...@confluent.io
> 
> > wrote:
> >
> >> Florian,
> >>
> >> thanks for updating the KIP (and no worries for late reply -- 2.2
> >> release kept us busy anyway...). Overall LGTM.
> >>
> >> Just some nits:
> >>
> >>
> >> KStream-Table:
> >>
> >> Do we need to list the existing stream-globalTable join methods in
> >>> the
> >> first table (thought it should only contain new/changing methods).
> >>
> >> typo: `join(GlobalKTbale, KeyValueMapper, ValueJoiner, Named)`
> >>
> >> `leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)` is missing
> >> the
>  new
> >> `Named` parameter.
> >>
> >> `static Joined#named(final String name)`
> >>  -> should be `#as(...)` instead of `named(...)`
> >>
> >> flatTransform() is missing (cf. KIP-313)
> >>
> >>
> >>
> >> KTable-table:
> >>
>