Hi,

The option #3 seems to be a good alternative and I find the API more
elegant (thanks John).

But, we still have the need to overload some methods either because they do
not accept an action instance or because they are translated to multiple
processors.

For example, this is the case for methods branch() and merge(). We could
introduce a new interface Named (or maybe a different name ?) with a method
name(). All action interfaces could extend this one to implement the option
3).
This would result by having the following overloads  :

Stream<K, V> merge(final Named name, final KStream<K, V> stream);
KStream<K, V>[] branch(final Named name, final Predicate<? super K, ? super
V>... predicates)

N.B : The list above is  not exhaustive

---------
user's code will become :

        KStream<String, Integer> stream = builder.stream("test");
        KStream<String, Integer>[] branches =
stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"),
                Predicate.named("STREAM-PAIR-VALUE", (k, v) -> v % 2 == 0),
                Predicate.named("STREAM-IMPAIR-VALUE", (k, v) -> v % 2 !=
0));

        branches[0].to("pair");
        branches[1].to("impair");
---------

This is a mix of the options 3) and 1)

Le ven. 6 juil. 2018 à 22:58, Guozhang Wang <wangg...@gmail.com> a écrit :

> Hi folks, just to summarize the options we have so far:
>
> 1) Add a new "as" for KTable / KStream, plus adding new fields for
> operators-returns-void control objects (the current wiki's proposal).
>
> Pros: no more overloads.
> Cons: a bit departing with the current high-level API design of the DSL,
> plus, the inconsistency between operators-returns-void and
> operators-not-return-voids.
>
> 2) Add overloaded functions for all operators, that accepts a new control
> object "Described".
>
> Pros: consistent with current APIs.
> Cons: lots of overloaded functions to add.
>
> 3) Add another default function in the interface (thank you J8!) as John
> proposed.
>
> Pros: no overloaded functions, no "Described".
> Cons: do we lose lambda functions really (seems not if we provide a "named"
> for each func)? Plus "Described" may be more extensible than a single
> `String`.
>
>
> My principle of considering which one is better depends primarily on "how
> to make advanced users easily use the additional API, while keeping it
> hidden from normal users who do not care at all". For that purpose I think
> 3) > 1) > 2).
>
> One caveat though, is that changing the interface would not be
> binary-compatible though source-compatible, right? I.e. users need to
> recompile their code though no changes needed.
>
>
>
> Another note: for 3), if we really want to keep extensibility of Described
> we could do sth. like:
>
> ---------
>
> public interface Predicate<K, V> {
>     // existing method
>     boolean test(final K key, final V value);
>
>     // new default method adds the ability to name the predicate
>     default Described described() {
>         return new Described(null);
>     }
> }
>
> ----------
>
> where user's code becomes:
>
> stream.filter(named("key", (k, v) -> true));   // note `named` now just
> sets a Described("key") in "described()".
>
> stream.filter(described(Described.as("key", /* any other fancy parameters
> in the future*/), (k, v) -> true));
> ----------
>
>
> I feel it is not much likely that we'd need to extend it further in the
> future, so just a `String` would be good enough. But just listing all
> possibilities here.
>
>
>
> Guozhang
>
>
>
>
>
>
> On Fri, Jul 6, 2018 at 8:19 AM, John Roesler <j...@confluent.io> wrote:
>
> > Hi Florian,
> >
> > Sorry I'm late to the party, but I missed the message originally.
> >
> > Regarding the names, it's probably a good idea to stick to the same
> > character set we're currently using: letters, numbers, and hyphens. The
> > names are used in Kafka topics, files and folders, and RocksDB databases,
> > and we also need them to work with the file systems of Windows, Linux,
> and
> > MacOS. My opinion is that with a situation like that, it's better to be
> > conservative. It might also be a good idea to impose an upper limit on
> name
> > length to avoid running afoul of any of those systems.
> >
> > ---
> >
> > It seems like there's a small debate between 1) adding a new method to
> > KStream (and maybe KTable) to modify its name after the fact, or 2)
> > piggy-backing on the config objects where they exist and adding one where
> > they don't. To me, #2 is the better alternative even though it produces
> > more overloads and may be a bit awkward in places.
> >
> > The reason is simply that #1 is a high-level departure from the
> > graph-building paradigm we're using in the DSL. Consider:
> >
> > Graph.node1(config).node2(config)
> >
> > vs
> >
> > Graph.node1().config().node2().config()
> >
> > We could have done either, but we picked the former. I think it's
> probably
> > a good goal to try and stick to it so that developers can develop and
> rely
> > on their instincts for how the DSL will behave.
> >
> > I do want to present one alternative to adding new config objects: we can
> > just add a "name()" method to all our "action" interfaces. For example,
> > I'll demonstrate how we can add a "name" to Predicate and then use it to
> > name a "KStream#filter" DSL operator:
> >
> > public interface Predicate<K, V> {
> >     // existing method
> >     boolean test(final K key, final V value);
> >
> >     // new default method adds the ability to name the predicate
> >     default String name() {
> >         return null;
> >     }
> >
> >     // new static factory method adds the ability to wrap lambda
> predicates
> > with a named predicate
> >     static <K, V> Predicate<K, V> named(final String name, final
> > Predicate<K, V> predicate) {
> >         return new Predicate<K, V>() {
> >             @Override
> >             public boolean test(final K key, final V value) {
> >                 return predicate.test(key, value);
> >             }
> >
> >             @Override
> >             public String name() {
> >                 return name;
> >             }
> >         };
> >     }
> > }
> >
> > Then, here's how it would look to use it:
> >
> > // Anonymous predicates continue to work just fine
> > stream.filter((k, v) -> true);
> >
> > // Devs can swap in a Predicate that implements the name() method.
> > stream.filter(new Predicate<Object, Object>() {
> >     @Override
> >     public boolean test(final Object key, final Object value) {
> >         return true;
> >     }
> >
> >     @Override
> >     public String name() {
> >         return "hey";
> >     }
> > });
> >
> > // Or they can wrap their existing lambda using the static factory method
> > stream.filter(named("key", (k, v) -> true));
> >
> > Just a thought.
> >
> > Overall, I think it's really valuable to be able to name the processors,
> > for all the reasons you mentioned in the KIP. So thank you for
> introducing
> > this!
> >
> > Thanks,
> > -John
> >
> > On Thu, Jul 5, 2018 at 4:53 PM Florian Hussonnois <fhussonn...@gmail.com
> >
> > wrote:
> >
> > > Hi, thank you very much for all you suggestions. I've started to update
> > the
> > > KIP (
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> > > ).
> > > Also, I propose to rename the Processed class into Described - this
> will
> > be
> > > more meaningful (but this is just a detail).
> > >
> > > I'm OK to not enforcing uppercase for specific names but should we
> allow
> > > arbitrary names with whitespaces for example ? Currently, I can't tell
> if
> > > this can lead to some side effects ?
> > >
> > > Le lun. 11 juin 2018 à 01:31, Matthias J. Sax <matth...@confluent.io>
> a
> > > écrit :
> > >
> > > > Just catching up on this thread.
> > > >
> > > > I like the general idea. Couple of comments:
> > > >
> > > >  - I think that adding `Processed` (or maybe a different name?) is a
> > > > valid proposal for stateless operators that only have a single
> overload
> > > > atm. It would align with the overall API design.
> > > >
> > > >  - for all methods with multiple existing overloads, we can consider
> to
> > > > extend `Consumed`, `Produced`, `Materialized` etc to take an
> additional
> > > > processor name (not sure atm how elegant this is; we would need to
> > > > "play" with the API a little bit; the advantage would be, that we do
> > not
> > > > add more overloads what seems to be key for this KIP)
> > > >
> > > >  - operators return void: while I agree that the "name first"
> chaining
> > > > idea is not very intuitive, it might still work, if we name the
> method
> > > > correctly (again, we would need to "play" with the API a little bit
> to
> > > see)
> > > >
> > > >  - for DSL operators that are translated to multiple nodes: it might
> > > > make sense to use the specified operator name as prefix and add
> > > > reasonable suffixes. For example, a join translates into 5 operators
> > > > that could be name "name-left-store-processor",
> > > > "name-left-join-processor", "name-right-store-processor",
> > > > "name-right-join-processor", and "name-join-merge-processor" (or
> > > > similar). Maybe just using numbers might also work.
> > > >
> > > >  - I think, we should strip the number suffixes if a user provides
> > names
> > > >
> > > >  - enforcing upper case seems to be tricky: for example, we do not
> > > > enforce upper case for store names and we cannot easily change it as
> it
> > > > would break compatibility -- thus, for consistency reasons we might
> not
> > > > want to do this
> > > >
> > > >  - for better understand of the impact of the KIP, it would be quite
> > > > helpful if you would list all method names that are affected in the
> KIP
> > > > (ie, list all newly added overloads)
> > > >
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 5/31/18 6:40 PM, Guozhang Wang wrote:
> > > > > Hi Florian,
> > > > >
> > > > > Re 1: I think changing the KStreamImpl / KTableImpl to allow
> > modifying
> > > > the
> > > > > processor name after the operator is fine as long as we do the
> check
> > > > again
> > > > > when modifying that. In fact, we are having some topology
> > optimization
> > > > > going on which may modify processor names in the final topology
> > > anyways (
> > > > > https://github.com/apache/kafka/pull/4983). Semantically I think
> it
> > is
> > > > > easier to understand to developers than "deciding the processor
> name
> > > for
> > > > > the next operator".
> > > > >
> > > > > Re 2: Yeah I'm thinking that for operators that translates to
> > multiple
> > > > > processor names, we can still use the provided "hint" to name the
> > > > processor
> > > > > names, e.g. for Joins we can name them as `join-foo-this` and
> > > > > `join-foo-that` etc if user calls `as("foo")`.
> > > > >
> > > > > Re 3: The motivation I had about removing the suffix is that it has
> > > huge
> > > > > restrictions on topology compatibilities: consider if user code
> > added a
> > > > new
> > > > > operator, or library does some optimization to remove some
> operators,
> > > the
> > > > > suffix indexing may be changed for a large amount of the processor
> > > names:
> > > > > this will in turn change the internal state store names, as well as
> > > > > internal topic names as well, making the new application topology
> to
> > be
> > > > > incompatible with the ones. One rationale I had about this KIP is
> > that
> > > > > aligned this effort, moving forward we can allow users to customize
> > > > > internal names so that they can still be reused even with topology
> > > > changes
> > > > > (e.g. KIP-230), so I think removing the suffix index would be more
> > > > > applicable in the long run.
> > > > >
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Thu, May 31, 2018 at 3:08 PM, Florian Hussonnois <
> > > > fhussonn...@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> Hi ,
> > > > >> Thank you very much for your feedback.
> > > > >>
> > > > >> 1/
> > > > >> I agree that overloading most of the methods with a Processed is
> not
> > > > ideal.
> > > > >> I've started modifying the KStream API and I got to the same
> > > conclusion.
> > > > >> Also ading a new method directly to KStreamImpl and KTableImpl
> > classes
> > > > >> seems to be a better option.
> > > > >>
> > > > >> However a processor name cannot be redefined after calling an
> > operator
> > > > (or
> > > > >> maybe I miss something in the code).
> > > > >> From my understanding, this will only set the KStream name
> property
> > > not
> > > > the
> > > > >> processor name previsouly added to the topology builder - leading
> to
> > > > >> InvalidTopology exception.
> > > > >>
> > > > >> So the new method should actually defines the name of the next
> > > > processor :
> > > > >> Below is an example :
> > > > >>
> > > > >> *stream.as <http://stream.as
> >(Processed.name("MAPPE_TO_UPPERCASE")*
> > > > >> *          .map( (k, v) -> KeyValue.pair(k, v.toUpperCase()))*
> > > > >>
> > > > >> I think this approach could solve the cases for methods returning
> > > void ?
> > > > >>
> > > > >> Regarding this new method we have two possible implementations :
> > > > >>
> > > > >>    1. Adding a method like : withName(String processorName)
> > > > >>    2. or adding a method accepting an Processed object :
> > > as(Processed).
> > > > >>
> > > > >> I think solution 2. is preferable as the Processed class could be
> > > > enriched
> > > > >> further (in futur).
> > > > >>
> > > > >> 2/
> > > > >> As Guozhang said some operators add internal processors.
> > > > >> For example the branch() method create one KStreamBranch processor
> > to
> > > > route
> > > > >> records and one KStreamPassThrough processor for each branch.
> > > > >> In that situation only the parent processor can be named. For
> > children
> > > > >> processors we could keep the current behaviour that add a suffix
> > (i.e
> > > > >> KSTREAM-BRANCHCHILD-)
> > > > >>
> > > > >> This also the case for the join() method that result to adding
> > > multiple
> > > > >> processors to the topology (windowing, left/right joins and a
> merge
> > > > >> processor).
> > > > >> I think, like for the branch method users could only define a
> > > processor
> > > > >> name prefix.
> > > > >>
> > > > >> 3/
> > > > >> I think we should  still added a suffix like "-0000000000" to
> > > processor
> > > > >> name and enforce uppercases as this will keep some consistency
> with
> > > the
> > > > >> ones generated by the API.
> > > > >>
> > > > >> 4/
> > > > >> Yes, the KTable interface should be modified like KStream to allow
> > > > custom
> > > > >> processor names definition.
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >>
> > > > >> Le jeu. 31 mai 2018 à 19:18, Damian Guy <damian....@gmail.com> a
> > > écrit
> > > > :
> > > > >>
> > > > >>> Hi Florian,
> > > > >>>
> > > > >>> Thanks for the KIP. What about KTable and other DSL interfaces?
> > Will
> > > > they
> > > > >>> not want to be able to do the same thing?
> > > > >>> It would be good to see a complete set of the public API changes.
> > > > >>>
> > > > >>> Cheers,
> > > > >>> Damian
> > > > >>>
> > > > >>> On Wed, 30 May 2018 at 19:45 Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > > >>>
> > > > >>>> Hello Florian,
> > > > >>>>
> > > > >>>> Thanks for the KIP. I have some meta feedbacks on the proposal:
> > > > >>>>
> > > > >>>> 1. You mentioned that this `Processed` object will be added to a
> > new
> > > > >>>> overloaded variant of all the stateless operators, what about
> the
> > > > >>> stateful
> > > > >>>> operators? Would like to hear your opinions if you have thought
> > > about
> > > > >>> that:
> > > > >>>> note for stateful operators they will usually be mapped to
> > multiple
> > > > >>>> processor node names, so we probably need to come up with some
> > ways
> > > to
> > > > >>>> define all their names.
> > > > >>>>
> > > > >>>> 2. I share the same concern with Bill as for adding lots of new
> > > > >> overload
> > > > >>>> functions into the stateless operators, as we have just spent
> > quite
> > > > >> some
> > > > >>>> effort in trimming them since 1.0.0 release. If the goal is to
> > just
> > > > >>> provide
> > > > >>>> some "hints" on the generated processor node names, not strictly
> > > > >>> enforcing
> > > > >>>> the exact names that to be generated, then how about we just
> add a
> > > new
> > > > >>>> function to `KStream` and `KTable` classes like:
> "as(Processed)",
> > > with
> > > > >>> the
> > > > >>>> semantics as "the latest operators that generate this KStream /
> > > KTable
> > > > >>> will
> > > > >>>> be named accordingly to this hint".
> > > > >>>>
> > > > >>>> The only caveat, is that for all operators like `KStream#to` and
> > > > >>>> `KStream#print` that returns void, this alternative would not
> > work.
> > > > But
> > > > >>> for
> > > > >>>> the current operators:
> > > > >>>>
> > > > >>>> a. KStream#print,
> > > > >>>> b. KStream#foreach,
> > > > >>>> c. KStream#to,
> > > > >>>> d. KStream#process
> > > > >>>>
> > > > >>>> I personally felt that except `KStream#process` users would not
> > > > usually
> > > > >>>> bother to override their names, and for `KStream#process` we
> could
> > > add
> > > > >> an
> > > > >>>> overload variant with the additional Processed object.
> > > > >>>>
> > > > >>>>
> > > > >>>> 3. In your example, the processor names are still added with a
> > > suffix
> > > > >>> like
> > > > >>>> "
> > > > >>>> -0000000000", is this intentional? If yes, why (I thought with
> > user
> > > > >>>> specified processor name hints we will not add suffix to
> > distinguish
> > > > >>>> different nodes of the same type any more)?
> > > > >>>>
> > > > >>>>
> > > > >>>> Guozhang
> > > > >>>>
> > > > >>>>
> > > > >>>> On Tue, May 29, 2018 at 6:47 AM, Bill Bejeck <bbej...@gmail.com
> >
> > > > >> wrote:
> > > > >>>>
> > > > >>>>> Hi Florian,
> > > > >>>>>
> > > > >>>>> Thanks for the KIP.  I think being able to add more context to
> > the
> > > > >>>>> processor names would be useful.
> > > > >>>>>
> > > > >>>>> I like the idea of adding a "withProcessorName" to Produced,
> > > Consumed
> > > > >>> and
> > > > >>>>> Joined.
> > > > >>>>>
> > > > >>>>> But instead of adding the "Processed" parameter to a large
> > > percentage
> > > > >>> of
> > > > >>>>> the methods, which would result in overloaded methods (which we
> > > > >> removed
> > > > >>>>> quite a bit with KIP-182) what do you think of adding a method
> > > > >>>>> to the AbstractStream class "withName(String processorName)"?
> BTW
> > > I"m
> > > > >>> not
> > > > >>>>> married to the method name, it's the best I can do off the top
> of
> > > my
> > > > >>>> head.
> > > > >>>>>
> > > > >>>>> For the methods that return void, we'd have to add a parameter,
> > but
> > > > >>> that
> > > > >>>>> would at least cut down on the number of overloaded methods in
> > the
> > > > >> API.
> > > > >>>>>
> > > > >>>>> Just my 2 cents.
> > > > >>>>>
> > > > >>>>> Thanks,
> > > > >>>>> Bill
> > > > >>>>>
> > > > >>>>> On Sun, May 27, 2018 at 4:13 PM, Florian Hussonnois <
> > > > >>>> fhussonn...@gmail.com
> > > > >>>>>>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>> Hi,
> > > > >>>>>>
> > > > >>>>>> I would like to start a new discussion on following KIP :
> > > > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > >>>>>>
> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> > > > >>>>>>
> > > > >>>>>> This is still a draft.
> > > > >>>>>>
> > > > >>>>>> Looking forward for your feedback.
> > > > >>>>>> --
> > > > >>>>>> Florian HUSSONNOIS
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>>
> > > > >>>>
> > > > >>>> --
> > > > >>>> -- Guozhang
> > > > >>>>
> > > > >>>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Florian HUSSONNOIS
> > > > >>
> > > > >
> > > > >
> > > > >
> > > >
> > > >
> > >
> > > --
> > > Florian HUSSONNOIS
> > >
> >
>
>
>
> --
> -- Guozhang
>


-- 
Florian HUSSONNOIS

Reply via email to