Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-08-20 Thread Matthias J. Sax
Thanks Paul!

On 8/15/19 7:28 PM, Paul Whalen wrote:
> I updated the KIP (and PR) to relax the restriction on connecting state
> stores via either means; it definitely makes sense to me at this point.
> I'd love to hear if there are any other concerns or broad objections to the
> KIP.
> 
> Paul
> 
> On Thu, Aug 8, 2019 at 10:12 PM Paul Whalen  wrote:
> 
>> Matthias,
>>
>> You did summarize my thinking correctly, thanks for writing it out.  I
>> think the disconnect on opinion is due to a couple things influenced by my
>> habits while writing streams code:
>>
>> 1) I don't see state stores that are "individually owned" versus "shared"
>> as that much different at all, at least from the perspective of the
>> business logic for the Processor. So it is actually a negative to separate
>> the connecting of stores, because it appears in the topology wiring that
>> fewer stores are being used by the Processor than actually are.  A reader
>> might assume that the Processor doesn't need other state to do its job
>> which could cause confusion.
>> 2) In practice, my addProcessor() and addStateStore() (or
>> builder.addStateStore() and stream.process() ) calls are very near each
>> other anyway, so the shared dependency on StoreBuilder is not a burden;
>> passing the same object could even bring clarity to the idea that the store
>> is shared and not individually owned.
>>
>> Hearing your thoughts though, I think I have imposed a bit too much of my
>> own style and assumptions on the API, especially with the shared dependency
>> on a single StoreBuilder and thoughts about store ownership/sharing.  I'm
>> going to update the KIP since the one +1 vote comes from John who is favor
>> of relaxing the restriction anyway.
>>
>> Paul
>>
>> On Wed, Aug 7, 2019 at 11:11 PM Matthias J. Sax 
>> wrote:
>>
>>> I am not sure if I full understand, hence, I try to rephrase:
>>>
 I can't think of an example that would require both ways, or would
 even be more readable using both ways.
>>>
>>> Example:
>>>
>>> There are two processor A and B, and one store S that both need to
>>> access and one store S_b that only B needs to access:
>>>
>>> If we don't allow to mix both approaches, it would be required to write
>>> the following code:
>>>
>>>   Topology t = new Topology();
>>>   t.addProcessor("A", ...); // does not add any store
>>>   t.addProceccor("B", ...); // does not add any store
>>>   t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
>>>   t.addStateStore(..., "B"); // adds S_b and connect it to B
>>>
>>> // DSL example:
>>>
>>>   StreamsBiulder b = new StreamsBuilder();
>>>   b.addStateStore() // adds S
>>>   b.addStateStore() // adds S_b
>>>   stream1.process(..., "S") // add A and connect S
>>>   stream2.process(..., "S", "S_b") // add B and connect S and S_b
>>>
>>>
>>> If we allow to mixes both approaches, the code could be (simplified to):
>>>
>>>   Topology t = new Topology();
>>>   t.addProcessor("A", ...); // does not add any store
>>>   t.addProceccor("B", ...); // adds/connects S_b implicitly
>>>   t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
>>>
>>> // DSL example
>>>
>>>   StreamsBiulder b = new StreamsBuilder();
>>>   b.addStateStore() // adds S
>>>   stream1.process(..., "S") // add A and connect S
>>>   stream2.process(..., "S") // add B and connect S; adds/connects S_b
>>> implicitly
>>>
>>> The fact that B has a "private store" could be encapsulated and I don't
>>> see why this would be bad?
>>>
 If you can
 do both ways, the actual full set of state stores being connected could
>>> be
 in wildly different places in the code, which could create confusion.
>>>
>>> Ie, I don't see why the second version would be confusing, or why the
>>> first version would be more readable (I don't argue it's less readable
>>> either though; I think both are equally readable)?
>>>
>>>
>>>
>>> Or do you argue that we should allow the following:
>>>
 Shared stores can be passed from
 the outside in an anonymous ProcessorSupplier if desired, making it
 effectively the same as passing the stateStoreNames var args
>>>
>>>   Topology t = new Topology();
>>>   t.addProcessor("A", ...); // adds/connects S implicitly
>>>   t.addProceccor("B", ...); // adds/connects S and S_b implicitly
>>>
>>> // DSL example
>>>
>>>   StreamsBiulder b = new StreamsBuilder();
>>>   stream1.process(...) // add A and add/connect S implicitly
>>>   stream2.process(...) // add B and add/connect S and S_b implicitly
>>>
>>> For this case, the second implicit adding of S would require to return
>>> the same `StoreBuilder` instance to make it idempotent what seems hard
>>> to achieve, because both `ProcessorSuppliers` now have a cross
>>> dependency to us the same object.
>>>
>>> Hence, I don't think this would be a good approach.
>>>
>>>
>>> Also, because we require for a unique store name to always pass the same
>>> `StoreBuilder` instance, we have actually a good protection against user

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-08-15 Thread Paul Whalen
I updated the KIP (and PR) to relax the restriction on connecting state
stores via either means; it definitely makes sense to me at this point.
I'd love to hear if there are any other concerns or broad objections to the
KIP.

Paul

On Thu, Aug 8, 2019 at 10:12 PM Paul Whalen  wrote:

> Matthias,
>
> You did summarize my thinking correctly, thanks for writing it out.  I
> think the disconnect on opinion is due to a couple things influenced by my
> habits while writing streams code:
>
> 1) I don't see state stores that are "individually owned" versus "shared"
> as that much different at all, at least from the perspective of the
> business logic for the Processor. So it is actually a negative to separate
> the connecting of stores, because it appears in the topology wiring that
> fewer stores are being used by the Processor than actually are.  A reader
> might assume that the Processor doesn't need other state to do its job
> which could cause confusion.
> 2) In practice, my addProcessor() and addStateStore() (or
> builder.addStateStore() and stream.process() ) calls are very near each
> other anyway, so the shared dependency on StoreBuilder is not a burden;
> passing the same object could even bring clarity to the idea that the store
> is shared and not individually owned.
>
> Hearing your thoughts though, I think I have imposed a bit too much of my
> own style and assumptions on the API, especially with the shared dependency
> on a single StoreBuilder and thoughts about store ownership/sharing.  I'm
> going to update the KIP since the one +1 vote comes from John who is favor
> of relaxing the restriction anyway.
>
> Paul
>
> On Wed, Aug 7, 2019 at 11:11 PM Matthias J. Sax 
> wrote:
>
>> I am not sure if I full understand, hence, I try to rephrase:
>>
>> > I can't think of an example that would require both ways, or would
>> > even be more readable using both ways.
>>
>> Example:
>>
>> There are two processor A and B, and one store S that both need to
>> access and one store S_b that only B needs to access:
>>
>> If we don't allow to mix both approaches, it would be required to write
>> the following code:
>>
>>   Topology t = new Topology();
>>   t.addProcessor("A", ...); // does not add any store
>>   t.addProceccor("B", ...); // does not add any store
>>   t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
>>   t.addStateStore(..., "B"); // adds S_b and connect it to B
>>
>> // DSL example:
>>
>>   StreamsBiulder b = new StreamsBuilder();
>>   b.addStateStore() // adds S
>>   b.addStateStore() // adds S_b
>>   stream1.process(..., "S") // add A and connect S
>>   stream2.process(..., "S", "S_b") // add B and connect S and S_b
>>
>>
>> If we allow to mixes both approaches, the code could be (simplified to):
>>
>>   Topology t = new Topology();
>>   t.addProcessor("A", ...); // does not add any store
>>   t.addProceccor("B", ...); // adds/connects S_b implicitly
>>   t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
>>
>> // DSL example
>>
>>   StreamsBiulder b = new StreamsBuilder();
>>   b.addStateStore() // adds S
>>   stream1.process(..., "S") // add A and connect S
>>   stream2.process(..., "S") // add B and connect S; adds/connects S_b
>> implicitly
>>
>> The fact that B has a "private store" could be encapsulated and I don't
>> see why this would be bad?
>>
>> > If you can
>> > do both ways, the actual full set of state stores being connected could
>> be
>> > in wildly different places in the code, which could create confusion.
>>
>> Ie, I don't see why the second version would be confusing, or why the
>> first version would be more readable (I don't argue it's less readable
>> either though; I think both are equally readable)?
>>
>>
>>
>> Or do you argue that we should allow the following:
>>
>> > Shared stores can be passed from
>> > the outside in an anonymous ProcessorSupplier if desired, making it
>> > effectively the same as passing the stateStoreNames var args
>>
>>   Topology t = new Topology();
>>   t.addProcessor("A", ...); // adds/connects S implicitly
>>   t.addProceccor("B", ...); // adds/connects S and S_b implicitly
>>
>> // DSL example
>>
>>   StreamsBiulder b = new StreamsBuilder();
>>   stream1.process(...) // add A and add/connect S implicitly
>>   stream2.process(...) // add B and add/connect S and S_b implicitly
>>
>> For this case, the second implicit adding of S would require to return
>> the same `StoreBuilder` instance to make it idempotent what seems hard
>> to achieve, because both `ProcessorSuppliers` now have a cross
>> dependency to us the same object.
>>
>> Hence, I don't think this would be a good approach.
>>
>>
>> Also, because we require for a unique store name to always pass the same
>> `StoreBuilder` instance, we have actually a good protection against user
>> bug that may add two stores with the same name but different builders
>> twice.
>>
>>
>> I also do not feel super strong about it, but see some advantages to
>> allow the m

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-08-08 Thread Paul Whalen
Matthias,

You did summarize my thinking correctly, thanks for writing it out.  I
think the disconnect on opinion is due to a couple things influenced by my
habits while writing streams code:

1) I don't see state stores that are "individually owned" versus "shared"
as that much different at all, at least from the perspective of the
business logic for the Processor. So it is actually a negative to separate
the connecting of stores, because it appears in the topology wiring that
fewer stores are being used by the Processor than actually are.  A reader
might assume that the Processor doesn't need other state to do its job
which could cause confusion.
2) In practice, my addProcessor() and addStateStore() (or
builder.addStateStore() and stream.process() ) calls are very near each
other anyway, so the shared dependency on StoreBuilder is not a burden;
passing the same object could even bring clarity to the idea that the store
is shared and not individually owned.

Hearing your thoughts though, I think I have imposed a bit too much of my
own style and assumptions on the API, especially with the shared dependency
on a single StoreBuilder and thoughts about store ownership/sharing.  I'm
going to update the KIP since the one +1 vote comes from John who is favor
of relaxing the restriction anyway.

Paul

On Wed, Aug 7, 2019 at 11:11 PM Matthias J. Sax 
wrote:

> I am not sure if I full understand, hence, I try to rephrase:
>
> > I can't think of an example that would require both ways, or would
> > even be more readable using both ways.
>
> Example:
>
> There are two processor A and B, and one store S that both need to
> access and one store S_b that only B needs to access:
>
> If we don't allow to mix both approaches, it would be required to write
> the following code:
>
>   Topology t = new Topology();
>   t.addProcessor("A", ...); // does not add any store
>   t.addProceccor("B", ...); // does not add any store
>   t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
>   t.addStateStore(..., "B"); // adds S_b and connect it to B
>
> // DSL example:
>
>   StreamsBiulder b = new StreamsBuilder();
>   b.addStateStore() // adds S
>   b.addStateStore() // adds S_b
>   stream1.process(..., "S") // add A and connect S
>   stream2.process(..., "S", "S_b") // add B and connect S and S_b
>
>
> If we allow to mixes both approaches, the code could be (simplified to):
>
>   Topology t = new Topology();
>   t.addProcessor("A", ...); // does not add any store
>   t.addProceccor("B", ...); // adds/connects S_b implicitly
>   t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
>
> // DSL example
>
>   StreamsBiulder b = new StreamsBuilder();
>   b.addStateStore() // adds S
>   stream1.process(..., "S") // add A and connect S
>   stream2.process(..., "S") // add B and connect S; adds/connects S_b
> implicitly
>
> The fact that B has a "private store" could be encapsulated and I don't
> see why this would be bad?
>
> > If you can
> > do both ways, the actual full set of state stores being connected could
> be
> > in wildly different places in the code, which could create confusion.
>
> Ie, I don't see why the second version would be confusing, or why the
> first version would be more readable (I don't argue it's less readable
> either though; I think both are equally readable)?
>
>
>
> Or do you argue that we should allow the following:
>
> > Shared stores can be passed from
> > the outside in an anonymous ProcessorSupplier if desired, making it
> > effectively the same as passing the stateStoreNames var args
>
>   Topology t = new Topology();
>   t.addProcessor("A", ...); // adds/connects S implicitly
>   t.addProceccor("B", ...); // adds/connects S and S_b implicitly
>
> // DSL example
>
>   StreamsBiulder b = new StreamsBuilder();
>   stream1.process(...) // add A and add/connect S implicitly
>   stream2.process(...) // add B and add/connect S and S_b implicitly
>
> For this case, the second implicit adding of S would require to return
> the same `StoreBuilder` instance to make it idempotent what seems hard
> to achieve, because both `ProcessorSuppliers` now have a cross
> dependency to us the same object.
>
> Hence, I don't think this would be a good approach.
>
>
> Also, because we require for a unique store name to always pass the same
> `StoreBuilder` instance, we have actually a good protection against user
> bug that may add two stores with the same name but different builders
> twice.
>
>
> I also do not feel super strong about it, but see some advantages to
> allow the mixed approach, and don't see disadvantages. Would be good to
> get input from others, too.
>
>
>
> -Matthias
>
>
> On 8/7/19 7:29 PM, Paul Whalen wrote:
> > My thinking on restricting the API to enforce only one way of connecting
> > stores would make it more simple to use and end up with more readable
> > code.  I can't think of an example that would require both ways, or would
> > even be more readable using both ways.  Sh

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-08-07 Thread Matthias J. Sax
I am not sure if I full understand, hence, I try to rephrase:

> I can't think of an example that would require both ways, or would
> even be more readable using both ways.

Example:

There are two processor A and B, and one store S that both need to
access and one store S_b that only B needs to access:

If we don't allow to mix both approaches, it would be required to write
the following code:

  Topology t = new Topology();
  t.addProcessor("A", ...); // does not add any store
  t.addProceccor("B", ...); // does not add any store
  t.addStateStore(..., "A", "B"); // adds S and connect it to A and B
  t.addStateStore(..., "B"); // adds S_b and connect it to B

// DSL example:

  StreamsBiulder b = new StreamsBuilder();
  b.addStateStore() // adds S
  b.addStateStore() // adds S_b
  stream1.process(..., "S") // add A and connect S
  stream2.process(..., "S", "S_b") // add B and connect S and S_b


If we allow to mixes both approaches, the code could be (simplified to):

  Topology t = new Topology();
  t.addProcessor("A", ...); // does not add any store
  t.addProceccor("B", ...); // adds/connects S_b implicitly
  t.addStateStore(..., "A", "B"); // adds S and connect it to A and B

// DSL example

  StreamsBiulder b = new StreamsBuilder();
  b.addStateStore() // adds S
  stream1.process(..., "S") // add A and connect S
  stream2.process(..., "S") // add B and connect S; adds/connects S_b
implicitly

The fact that B has a "private store" could be encapsulated and I don't
see why this would be bad?

> If you can
> do both ways, the actual full set of state stores being connected could be
> in wildly different places in the code, which could create confusion.

Ie, I don't see why the second version would be confusing, or why the
first version would be more readable (I don't argue it's less readable
either though; I think both are equally readable)?



Or do you argue that we should allow the following:

> Shared stores can be passed from
> the outside in an anonymous ProcessorSupplier if desired, making it
> effectively the same as passing the stateStoreNames var args

  Topology t = new Topology();
  t.addProcessor("A", ...); // adds/connects S implicitly
  t.addProceccor("B", ...); // adds/connects S and S_b implicitly

// DSL example

  StreamsBiulder b = new StreamsBuilder();
  stream1.process(...) // add A and add/connect S implicitly
  stream2.process(...) // add B and add/connect S and S_b implicitly

For this case, the second implicit adding of S would require to return
the same `StoreBuilder` instance to make it idempotent what seems hard
to achieve, because both `ProcessorSuppliers` now have a cross
dependency to us the same object.

Hence, I don't think this would be a good approach.


Also, because we require for a unique store name to always pass the same
`StoreBuilder` instance, we have actually a good protection against user
bug that may add two stores with the same name but different builders twice.


I also do not feel super strong about it, but see some advantages to
allow the mixed approach, and don't see disadvantages. Would be good to
get input from others, too.



-Matthias


On 8/7/19 7:29 PM, Paul Whalen wrote:
> My thinking on restricting the API to enforce only one way of connecting
> stores would make it more simple to use and end up with more readable
> code.  I can't think of an example that would require both ways, or would
> even be more readable using both ways.  Shared stores can be passed from
> the outside in an anonymous ProcessorSupplier if desired, making it
> effectively the same as passing the stateStoreNames var args.  If you can
> do both ways, the actual full set of state stores being connected could be
> in wildly different places in the code, which could create confusion.  I
> personally can't imagine a case in which that would be useful.
> 
> All that being said, I don't feel terribly strongly about it.  I'm just
> trying to make the API as straightforward as possible.  Admittedly a
> runtime check doesn't make for a great API, but I see it more as an
> opportunity to educate the user to make it clear that "connecting" a state
> store is a thing that can be done in two different ways, but there is no
> reason to mix both.  If it seems like there's a compelling reason to mix
> them then I would abandon the idea in a heartbeat.
> 
> Paul
> 
> On Wed, Aug 7, 2019 at 5:48 PM Matthias J. Sax 
> wrote:
> 
>> Sorry for the long silence on this KIP Paul! I guess the 2.3 release
>> distracted us somewhat.
>>
>> Overall, I am +1.
>>
>> With regard to John's point about owned vs shared state stores, I think
>> it describe a valid use case, and throwing an exception if people want
>> to mix both features might be too restrictive?
>>
>> We could of course later relax the restriction, but atm I am not sure
>> what the main argument for adding the restriction is?
>>
>> (a) In the current API, one could connect the same store multiple times
>> to the same processor without gettin

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-08-07 Thread Paul Whalen
My thinking on restricting the API to enforce only one way of connecting
stores would make it more simple to use and end up with more readable
code.  I can't think of an example that would require both ways, or would
even be more readable using both ways.  Shared stores can be passed from
the outside in an anonymous ProcessorSupplier if desired, making it
effectively the same as passing the stateStoreNames var args.  If you can
do both ways, the actual full set of state stores being connected could be
in wildly different places in the code, which could create confusion.  I
personally can't imagine a case in which that would be useful.

All that being said, I don't feel terribly strongly about it.  I'm just
trying to make the API as straightforward as possible.  Admittedly a
runtime check doesn't make for a great API, but I see it more as an
opportunity to educate the user to make it clear that "connecting" a state
store is a thing that can be done in two different ways, but there is no
reason to mix both.  If it seems like there's a compelling reason to mix
them then I would abandon the idea in a heartbeat.

Paul

On Wed, Aug 7, 2019 at 5:48 PM Matthias J. Sax 
wrote:

> Sorry for the long silence on this KIP Paul! I guess the 2.3 release
> distracted us somewhat.
>
> Overall, I am +1.
>
> With regard to John's point about owned vs shared state stores, I think
> it describe a valid use case, and throwing an exception if people want
> to mix both features might be too restrictive?
>
> We could of course later relax the restriction, but atm I am not sure
> what the main argument for adding the restriction is?
>
> (a) In the current API, one could connect the same store multiple times
> to the same processor without getting an exception, because the
> operation is idempotent.
>
> (b) The KIP also suggest to relax the current restriction to add the
> same store twice, as long as store name and `StoreBuilder` instance are
> the same, because it's an idempotent (hence, safe) operation too.
>
> Because we have already (a) and (b) and consider both as safe, it seems
> we could also treat the case of mixing both patterns as idempotent and
> hence safe. And if we do this, we enable to mix both patterns for
> different stores implicitly.
>
>
> Thoughts?
>
>
> -Matthias
>
>
>
> On 6/17/19 2:31 PM, John Roesler wrote:
> > Hey, all,
> >
> > Sorry I'm late to the party. I meant to read into this KIP before, but
> > didn't get around to it. I was just reminded when Paul mentioned it in
> > a different thread. Please feel free to bump a discussion any time it
> > stalls!
> >
> > I've just read through the whole discussion so far, and, to echo the
> > earlier sentiments, the motivation seems very clear. I remember how
> > hard it was to figure out how to actually wire up a stateful processor
> > properly the first couple of times. Not a very good user experience.
> >
> > I looked over the whole conversation to date, as well as the KIP and
> > the latest PR (https://github.com/apache/kafka/pull/6824). FWIW, The
> > current approach looks good to me. I was concerned about the "cheat
> > codes"-style mixin interface. Discoverability would have been a
> > problem, and it's also not a very normal pattern for Java APIs. It
> > actually looks a little more like something you'd do with an
> > annotation.
> >
> > So the current approach seems good:
> > * The new interface with a default to return `null` is effectively
> > shipping the feature flagged "off" (which is nice and safe)
> > * Shared stores are "supported" the same way they always have been, by
> > connecting them externally. This makes sense, since those stores
> > aren't "owned" by any of the connected processors.
> > * Processors that do own their stores can configure them in the same
> > file they use them, which decreases the probability of cast exceptions
> > when they get the stores from the context.
> > * Stateful processors that own their stores are available for one-shot
> > definition of the stores and the processor all in the same file (this
> > is the main point of the KIP)
> >
> > The runtime check that stores can't be both defined in the processor
> > and referenced by name might be a little restrictive (since we already
> > have the restriction that same-name stores can't be registered), but
> > it would also be easy to remove it later. I'm just thinking that if I
> > have a processor that owns one store and shares another, it would be
> > pretty obvious how to hook it up in the proposed API, except for that
> > check.
> >
> > One last thought, regarding the all-important interface name: If you
> > wanted to indicate more that the stores are available for Streams to
> > connect, rather than that they are already connected, you could call
> > it ConnectableStoreProvider (similar to AutoCloseable).
> >
> > I just thought I'd summarize the current state, since it's been a
> > while and no one has voted yet. I'll go ahead and vote now on the
> > voting thread, 

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-08-07 Thread Matthias J. Sax
Sorry for the long silence on this KIP Paul! I guess the 2.3 release
distracted us somewhat.

Overall, I am +1.

With regard to John's point about owned vs shared state stores, I think
it describe a valid use case, and throwing an exception if people want
to mix both features might be too restrictive?

We could of course later relax the restriction, but atm I am not sure
what the main argument for adding the restriction is?

(a) In the current API, one could connect the same store multiple times
to the same processor without getting an exception, because the
operation is idempotent.

(b) The KIP also suggest to relax the current restriction to add the
same store twice, as long as store name and `StoreBuilder` instance are
the same, because it's an idempotent (hence, safe) operation too.

Because we have already (a) and (b) and consider both as safe, it seems
we could also treat the case of mixing both patterns as idempotent and
hence safe. And if we do this, we enable to mix both patterns for
different stores implicitly.


Thoughts?


-Matthias



On 6/17/19 2:31 PM, John Roesler wrote:
> Hey, all,
> 
> Sorry I'm late to the party. I meant to read into this KIP before, but
> didn't get around to it. I was just reminded when Paul mentioned it in
> a different thread. Please feel free to bump a discussion any time it
> stalls!
> 
> I've just read through the whole discussion so far, and, to echo the
> earlier sentiments, the motivation seems very clear. I remember how
> hard it was to figure out how to actually wire up a stateful processor
> properly the first couple of times. Not a very good user experience.
> 
> I looked over the whole conversation to date, as well as the KIP and
> the latest PR (https://github.com/apache/kafka/pull/6824). FWIW, The
> current approach looks good to me. I was concerned about the "cheat
> codes"-style mixin interface. Discoverability would have been a
> problem, and it's also not a very normal pattern for Java APIs. It
> actually looks a little more like something you'd do with an
> annotation.
> 
> So the current approach seems good:
> * The new interface with a default to return `null` is effectively
> shipping the feature flagged "off" (which is nice and safe)
> * Shared stores are "supported" the same way they always have been, by
> connecting them externally. This makes sense, since those stores
> aren't "owned" by any of the connected processors.
> * Processors that do own their stores can configure them in the same
> file they use them, which decreases the probability of cast exceptions
> when they get the stores from the context.
> * Stateful processors that own their stores are available for one-shot
> definition of the stores and the processor all in the same file (this
> is the main point of the KIP)
> 
> The runtime check that stores can't be both defined in the processor
> and referenced by name might be a little restrictive (since we already
> have the restriction that same-name stores can't be registered), but
> it would also be easy to remove it later. I'm just thinking that if I
> have a processor that owns one store and shares another, it would be
> pretty obvious how to hook it up in the proposed API, except for that
> check.
> 
> One last thought, regarding the all-important interface name: If you
> wanted to indicate more that the stores are available for Streams to
> connect, rather than that they are already connected, you could call
> it ConnectableStoreProvider (similar to AutoCloseable).
> 
> I just thought I'd summarize the current state, since it's been a
> while and no one has voted yet. I'll go ahead and vote now on the
> voting thread, since I'm +1 on the current proposal.
> 
> Thanks,
> -John
> 
> On Mon, May 27, 2019 at 1:59 PM Paul Whalen  wrote:
>>
>> It wasn't much of a lift changing option B to work for option C, so I
>> closed that PR and made a new one, which should be identical to the KIP
>> right now: https://github.com/apache/kafka/pull/6824.  There are a few
>> todos still which I will hold off until the KIP is accepted.
>>
>> I created a voting thread about a month ago, so I'll bump that now that
>> we're nearly there.
>>
>> Paul
>>
>> On Sun, May 26, 2019 at 2:21 PM Paul Whalen  wrote:
>>
>>> Per Matthias's suggestion from a while ago, I actually implemented a good
>>> amount of option B to get a sense of the user experience and documentation
>>> requirements.  For a few reasons mentioned below, I think it's not my
>>> favorite option, and I prefer option C.  But since I did the work and it
>>> can help discussion, I may as well share:
>>> https://github.com/apache/kafka/pull/6821.
>>>
>>> Things I learned along the way implementing Option B:
>>>  - For the name of the interface, I like ConnectedStoreProvider.  It isn't
>>> perfect but it seems to capture the general gist without being overly
>>> verbose.  I get that from a strict standpoint it's not "providing connected
>>> stores" but is instead "providing stores to be connected

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-06-17 Thread John Roesler
Hey, all,

Sorry I'm late to the party. I meant to read into this KIP before, but
didn't get around to it. I was just reminded when Paul mentioned it in
a different thread. Please feel free to bump a discussion any time it
stalls!

I've just read through the whole discussion so far, and, to echo the
earlier sentiments, the motivation seems very clear. I remember how
hard it was to figure out how to actually wire up a stateful processor
properly the first couple of times. Not a very good user experience.

I looked over the whole conversation to date, as well as the KIP and
the latest PR (https://github.com/apache/kafka/pull/6824). FWIW, The
current approach looks good to me. I was concerned about the "cheat
codes"-style mixin interface. Discoverability would have been a
problem, and it's also not a very normal pattern for Java APIs. It
actually looks a little more like something you'd do with an
annotation.

So the current approach seems good:
* The new interface with a default to return `null` is effectively
shipping the feature flagged "off" (which is nice and safe)
* Shared stores are "supported" the same way they always have been, by
connecting them externally. This makes sense, since those stores
aren't "owned" by any of the connected processors.
* Processors that do own their stores can configure them in the same
file they use them, which decreases the probability of cast exceptions
when they get the stores from the context.
* Stateful processors that own their stores are available for one-shot
definition of the stores and the processor all in the same file (this
is the main point of the KIP)

The runtime check that stores can't be both defined in the processor
and referenced by name might be a little restrictive (since we already
have the restriction that same-name stores can't be registered), but
it would also be easy to remove it later. I'm just thinking that if I
have a processor that owns one store and shares another, it would be
pretty obvious how to hook it up in the proposed API, except for that
check.

One last thought, regarding the all-important interface name: If you
wanted to indicate more that the stores are available for Streams to
connect, rather than that they are already connected, you could call
it ConnectableStoreProvider (similar to AutoCloseable).

I just thought I'd summarize the current state, since it's been a
while and no one has voted yet. I'll go ahead and vote now on the
voting thread, since I'm +1 on the current proposal.

Thanks,
-John

On Mon, May 27, 2019 at 1:59 PM Paul Whalen  wrote:
>
> It wasn't much of a lift changing option B to work for option C, so I
> closed that PR and made a new one, which should be identical to the KIP
> right now: https://github.com/apache/kafka/pull/6824.  There are a few
> todos still which I will hold off until the KIP is accepted.
>
> I created a voting thread about a month ago, so I'll bump that now that
> we're nearly there.
>
> Paul
>
> On Sun, May 26, 2019 at 2:21 PM Paul Whalen  wrote:
>
> > Per Matthias's suggestion from a while ago, I actually implemented a good
> > amount of option B to get a sense of the user experience and documentation
> > requirements.  For a few reasons mentioned below, I think it's not my
> > favorite option, and I prefer option C.  But since I did the work and it
> > can help discussion, I may as well share:
> > https://github.com/apache/kafka/pull/6821.
> >
> > Things I learned along the way implementing Option B:
> >  - For the name of the interface, I like ConnectedStoreProvider.  It isn't
> > perfect but it seems to capture the general gist without being overly
> > verbose.  I get that from a strict standpoint it's not "providing connected
> > stores" but is instead "providing stores to be connected," but I think that
> > in context and with documentation, the risk of someone being confused by
> > that is low.
> >  - I definitely felt the discoverability issue while trying to write clear
> > documentation; you really have to make sure to connect the dots for the
> > user when the interface isn't connected to anything.
> >  - Another problem with a separate interface found while writing
> > tests/examples: defining a ProcessorSupplier that also implements
> > ConnectedStoreProvider cannot be done anonymously, since you can't define
> > an anonymous class in Java that implements multiple interfaces.  I actually
> > consider this a fairly major usability issue - it means a user always has
> > to have a custom class rather than doing it inline.  We could provide an
> > abstract class that implements the two, but at that point, we're not that
> > far from option A or C anyway.
> >
> > I updated the KIP with my current thinking, which as mentioned is
> > Matthias's option C.  Once again for clarity, that *is not* what is in the
> > linked pull request.  The current KIP is my proposal.
> >
> > Thanks everyone for the input!
> >
> > P.S.  What do folks use to edit the HTML documentation, e.g.
> > processor-a

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-05-27 Thread Paul Whalen
It wasn't much of a lift changing option B to work for option C, so I
closed that PR and made a new one, which should be identical to the KIP
right now: https://github.com/apache/kafka/pull/6824.  There are a few
todos still which I will hold off until the KIP is accepted.

I created a voting thread about a month ago, so I'll bump that now that
we're nearly there.

Paul

On Sun, May 26, 2019 at 2:21 PM Paul Whalen  wrote:

> Per Matthias's suggestion from a while ago, I actually implemented a good
> amount of option B to get a sense of the user experience and documentation
> requirements.  For a few reasons mentioned below, I think it's not my
> favorite option, and I prefer option C.  But since I did the work and it
> can help discussion, I may as well share:
> https://github.com/apache/kafka/pull/6821.
>
> Things I learned along the way implementing Option B:
>  - For the name of the interface, I like ConnectedStoreProvider.  It isn't
> perfect but it seems to capture the general gist without being overly
> verbose.  I get that from a strict standpoint it's not "providing connected
> stores" but is instead "providing stores to be connected," but I think that
> in context and with documentation, the risk of someone being confused by
> that is low.
>  - I definitely felt the discoverability issue while trying to write clear
> documentation; you really have to make sure to connect the dots for the
> user when the interface isn't connected to anything.
>  - Another problem with a separate interface found while writing
> tests/examples: defining a ProcessorSupplier that also implements
> ConnectedStoreProvider cannot be done anonymously, since you can't define
> an anonymous class in Java that implements multiple interfaces.  I actually
> consider this a fairly major usability issue - it means a user always has
> to have a custom class rather than doing it inline.  We could provide an
> abstract class that implements the two, but at that point, we're not that
> far from option A or C anyway.
>
> I updated the KIP with my current thinking, which as mentioned is
> Matthias's option C.  Once again for clarity, that *is not* what is in the
> linked pull request.  The current KIP is my proposal.
>
> Thanks everyone for the input!
>
> P.S.  What do folks use to edit the HTML documentation, e.g.
> processor-api.html?  I looked at doing it by hand it but it kind of looked
> like agony with all the small tags required for formatting code, so I'm
> sort of assuming there's tooling for it.
>
> On Fri, May 24, 2019 at 12:49 AM Matthias J. Sax 
> wrote:
>
>> I think the discussion mixed approaches a little bit, hence, let me
>> rephrase my understanding:
>>
>>
>> A) add new method with default implementation to `ProcessorSupplier`:
>>
>> For this case, we don't add a new interface, but only add a new method
>> to `ProcessorSupplier` -- to keep backward compatibility, we need to add
>> a default implementation. Users opt into the new feature by overwriting
>> the default implementation.
>>
>>
>> B) We add a new interface with new method:
>>
>> For this case, `ProcessorSupplier` interface is not changed and it does
>> also _not_ extend the new interface. Because `ProcessorSupplier` is not
>> changed, it's naturally backward compatible. Users opt into the new
>> feature, by adding the new interface to their ProcessorSupplier
>> implementation and they need to implement the new method because there
>> is no default implementation. Kafka Streams can use `instanceof` to
>> detect if the new interface is used or not and thus, to the right thing.
>>
>>
>> What was also discussed is a mix of both:
>>
>> C) We add a new interface with new method and let `ProcessorSupplier`
>> extend the new interface:
>>
>> Here, we need to add a default implementation to preserve backward
>> compatibility. Similar to (A), users opt into the feature by overwriting
>> the default implementation.
>>
>>
>>
>> Option (C) is the same as (A) from a user point of view because a user
>> won't care about the new interface. It only makes a difference for our
>> code base, as we can share the default implementation of the new method
>> This is only a small gain, as the implementation is trivial but also a
>> small drawback as we add new public interface that is useless to the
>> user because the user would never implement the interface directly.
>>
>>
>>
>> For (A/C), it might be simpler for users to detect the feature. For (B),
>> we have the advantage that users must implement the method if they use
>> the new interface.
>>
>> Overall, it seems that (A) might be the best choice because it makes the
>> feature easier discoverable and does not add a "useless" interface. If
>> you want to go with (C) to share the default implementation code, that's
>> also fine with me. I am convinced now (even if I brought it up), that
>> (B) might be not optimal because feature discoverability seems to be
>> important.
>>
>>
>>
>>
>> About `null` vs `emptyList`: I still tend

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-05-26 Thread Paul Whalen
Per Matthias's suggestion from a while ago, I actually implemented a good
amount of option B to get a sense of the user experience and documentation
requirements.  For a few reasons mentioned below, I think it's not my
favorite option, and I prefer option C.  But since I did the work and it
can help discussion, I may as well share:
https://github.com/apache/kafka/pull/6821.

Things I learned along the way implementing Option B:
 - For the name of the interface, I like ConnectedStoreProvider.  It isn't
perfect but it seems to capture the general gist without being overly
verbose.  I get that from a strict standpoint it's not "providing connected
stores" but is instead "providing stores to be connected," but I think that
in context and with documentation, the risk of someone being confused by
that is low.
 - I definitely felt the discoverability issue while trying to write clear
documentation; you really have to make sure to connect the dots for the
user when the interface isn't connected to anything.
 - Another problem with a separate interface found while writing
tests/examples: defining a ProcessorSupplier that also implements
ConnectedStoreProvider cannot be done anonymously, since you can't define
an anonymous class in Java that implements multiple interfaces.  I actually
consider this a fairly major usability issue - it means a user always has
to have a custom class rather than doing it inline.  We could provide an
abstract class that implements the two, but at that point, we're not that
far from option A or C anyway.

I updated the KIP with my current thinking, which as mentioned is
Matthias's option C.  Once again for clarity, that *is not* what is in the
linked pull request.  The current KIP is my proposal.

Thanks everyone for the input!

P.S.  What do folks use to edit the HTML documentation, e.g.
processor-api.html?  I looked at doing it by hand it but it kind of looked
like agony with all the small tags required for formatting code, so I'm
sort of assuming there's tooling for it.

On Fri, May 24, 2019 at 12:49 AM Matthias J. Sax 
wrote:

> I think the discussion mixed approaches a little bit, hence, let me
> rephrase my understanding:
>
>
> A) add new method with default implementation to `ProcessorSupplier`:
>
> For this case, we don't add a new interface, but only add a new method
> to `ProcessorSupplier` -- to keep backward compatibility, we need to add
> a default implementation. Users opt into the new feature by overwriting
> the default implementation.
>
>
> B) We add a new interface with new method:
>
> For this case, `ProcessorSupplier` interface is not changed and it does
> also _not_ extend the new interface. Because `ProcessorSupplier` is not
> changed, it's naturally backward compatible. Users opt into the new
> feature, by adding the new interface to their ProcessorSupplier
> implementation and they need to implement the new method because there
> is no default implementation. Kafka Streams can use `instanceof` to
> detect if the new interface is used or not and thus, to the right thing.
>
>
> What was also discussed is a mix of both:
>
> C) We add a new interface with new method and let `ProcessorSupplier`
> extend the new interface:
>
> Here, we need to add a default implementation to preserve backward
> compatibility. Similar to (A), users opt into the feature by overwriting
> the default implementation.
>
>
>
> Option (C) is the same as (A) from a user point of view because a user
> won't care about the new interface. It only makes a difference for our
> code base, as we can share the default implementation of the new method
> This is only a small gain, as the implementation is trivial but also a
> small drawback as we add new public interface that is useless to the
> user because the user would never implement the interface directly.
>
>
>
> For (A/C), it might be simpler for users to detect the feature. For (B),
> we have the advantage that users must implement the method if they use
> the new interface.
>
> Overall, it seems that (A) might be the best choice because it makes the
> feature easier discoverable and does not add a "useless" interface. If
> you want to go with (C) to share the default implementation code, that's
> also fine with me. I am convinced now (even if I brought it up), that
> (B) might be not optimal because feature discoverability seems to be
> important.
>
>
>
>
> About `null` vs `emptyList`: I still tend to like `null` better but it's
> really a detail and not too important. Note, that the question only
> arises for (A/C), but not for (B) because for (B) we don't need a
> default implementation.
>
>
>
>
> @Paul: It's unclear to me atm what your final proposal is because you
> mentioned that you might want to rename `StateStoreConnector`? It's also
> unclear to me atm, if you prefer (A), (B), or (C).
>
> Maybe you can update the KIP if necessary and clearly state what you
> final proposal is. Beside this, it seems we can move to a VOTE?
>
>
>
> -Matthia

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-05-23 Thread Matthias J. Sax
I think the discussion mixed approaches a little bit, hence, let me
rephrase my understanding:


A) add new method with default implementation to `ProcessorSupplier`:

For this case, we don't add a new interface, but only add a new method
to `ProcessorSupplier` -- to keep backward compatibility, we need to add
a default implementation. Users opt into the new feature by overwriting
the default implementation.


B) We add a new interface with new method:

For this case, `ProcessorSupplier` interface is not changed and it does
also _not_ extend the new interface. Because `ProcessorSupplier` is not
changed, it's naturally backward compatible. Users opt into the new
feature, by adding the new interface to their ProcessorSupplier
implementation and they need to implement the new method because there
is no default implementation. Kafka Streams can use `instanceof` to
detect if the new interface is used or not and thus, to the right thing.


What was also discussed is a mix of both:

C) We add a new interface with new method and let `ProcessorSupplier`
extend the new interface:

Here, we need to add a default implementation to preserve backward
compatibility. Similar to (A), users opt into the feature by overwriting
the default implementation.



Option (C) is the same as (A) from a user point of view because a user
won't care about the new interface. It only makes a difference for our
code base, as we can share the default implementation of the new method
This is only a small gain, as the implementation is trivial but also a
small drawback as we add new public interface that is useless to the
user because the user would never implement the interface directly.



For (A/C), it might be simpler for users to detect the feature. For (B),
we have the advantage that users must implement the method if they use
the new interface.

Overall, it seems that (A) might be the best choice because it makes the
feature easier discoverable and does not add a "useless" interface. If
you want to go with (C) to share the default implementation code, that's
also fine with me. I am convinced now (even if I brought it up), that
(B) might be not optimal because feature discoverability seems to be
important.




About `null` vs `emptyList`: I still tend to like `null` better but it's
really a detail and not too important. Note, that the question only
arises for (A/C), but not for (B) because for (B) we don't need a
default implementation.




@Paul: It's unclear to me atm what your final proposal is because you
mentioned that you might want to rename `StateStoreConnector`? It's also
unclear to me atm, if you prefer (A), (B), or (C).

Maybe you can update the KIP if necessary and clearly state what you
final proposal is. Beside this, it seems we can move to a VOTE?



-Matthias





On 5/2/19 3:01 PM, Bruno Cadonna wrote:
> Hi Paul,
> 
> I will try to express myself a bit clearer.
> 
> Ad 1)
> My assumption is that if `StateStoreConnector#stateStores()` returns `null`
> Kafka Streams will throw an NPE because on purpose no null check is
> performed before the loop that calls `StreamsBuilder#addStateStore()`. When
> the user finally understands the cause of the NPE, she knows that she has
> to override `StateStoreConnector#stateStores()` in her implementation. My
> question was, why let the user discover that she has to overwrite the
> method at runtime if you could not provide a default implementation for
> `StateStoreConnector#stateStores()` and let the compiler tell the user the
> need to overwrite the method. Not providing a default implementation
> without separating the interfaces implies not being backward-compatible.
> That means, if we choose to not provide a default implementation and let
> the compiler signal the necessity to override the method, we have to
> separate the interfaces in any case.
> 
> Ad 2)
> If you check for `null` or empty list in `process` and do not call
> `addStateStores` in those cases, the advantage of returning `null` to be
> saver to detect bugs as mentioned by Matthias would be lost. But maybe I am
> missing something here.
> 
> Best,
> Bruno
> 
> 
> 
> On Wed, May 1, 2019 at 6:27 AM Paul Whalen  wrote:
> 
>> I definitely don't mind anyone jumping, Bruno, thanks for the comments!
>>
>> 1) I'm not totally sure I'm clear on your point, but I think we're on the
>> same page - if we're adding a method to the XSupplier interfaces (by making
>> them inherit from a super interface StateStoreConnector) then we definitely
>> need a default implementation to maintain compatibility.  Whether the
>> default implementation returns null or an empty list is somewhat of a
>> detail.
>>
>> 2) If stream.process() sees that StateStoreConnector#stateStores() returns
>> either null or an empty list, it would handle that case specifically and
>> not try to call addStateStore at all.  Or is this not what you're asking?
>>
>> Separately, I'm still hacking away at the details of the PR and will
>> continue to get something into 

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-05-02 Thread Bruno Cadonna
Hi Paul,

I will try to express myself a bit clearer.

Ad 1)
My assumption is that if `StateStoreConnector#stateStores()` returns `null`
Kafka Streams will throw an NPE because on purpose no null check is
performed before the loop that calls `StreamsBuilder#addStateStore()`. When
the user finally understands the cause of the NPE, she knows that she has
to override `StateStoreConnector#stateStores()` in her implementation. My
question was, why let the user discover that she has to overwrite the
method at runtime if you could not provide a default implementation for
`StateStoreConnector#stateStores()` and let the compiler tell the user the
need to overwrite the method. Not providing a default implementation
without separating the interfaces implies not being backward-compatible.
That means, if we choose to not provide a default implementation and let
the compiler signal the necessity to override the method, we have to
separate the interfaces in any case.

Ad 2)
If you check for `null` or empty list in `process` and do not call
`addStateStores` in those cases, the advantage of returning `null` to be
saver to detect bugs as mentioned by Matthias would be lost. But maybe I am
missing something here.

Best,
Bruno



On Wed, May 1, 2019 at 6:27 AM Paul Whalen  wrote:

> I definitely don't mind anyone jumping, Bruno, thanks for the comments!
>
> 1) I'm not totally sure I'm clear on your point, but I think we're on the
> same page - if we're adding a method to the XSupplier interfaces (by making
> them inherit from a super interface StateStoreConnector) then we definitely
> need a default implementation to maintain compatibility.  Whether the
> default implementation returns null or an empty list is somewhat of a
> detail.
>
> 2) If stream.process() sees that StateStoreConnector#stateStores() returns
> either null or an empty list, it would handle that case specifically and
> not try to call addStateStore at all.  Or is this not what you're asking?
>
> Separately, I'm still hacking away at the details of the PR and will
> continue to get something into a discussable state, but I'll share some
> thoughts I've run into.
>
> A) I'm tentatively going the separate interface route (Matthias's
> suggestion) and naming it ConnectedStoreProvider.  Still don't love the
> name, but there's something nice about the name indicating *why* this thing
> is providing the store, not just that it is providing it.
>
> B) It has occurred to me that topology.addProcessor() could also recognize
> if ProcessorSupplier implements ConnectedStoreProvider and add and connect
> stores appropriately.  This isn't in the KIP and I think the value-add is
> lower (if you're reaching that low level, surely the "auto add/connect
> store" isn't too important to you), but I think it would be a confusing if
> it didn't, and I don't see any real downside.
>
> Paul
>
> On Tue, Apr 30, 2019 at 4:18 AM Bruno Cadonna  wrote:
>
> > Hi,
> >
> > @Paul: Thank you for the KIP!
> >
> > I hope you do not mind that I jump in.
> >
> > I have the following comments:
> >
> > 1) `null` vs empty list in the default implementation
> > IIUC, returning `null` in the default implementation should basically
> > signal that the method `stateStores` was not overridden. Why then
> provide a
> > default implementation in the first place? Without default implementation
> > you would discover the missing implementation already at compile-time and
> > not only at runtime. If you decide not to provide a default
> implementation,
> > `XSupplier extends StateStoreConnector` would break existing code as
> > Matthias has already pointed out.
> >
> > 2) `process` method adding the StoreBuilders to the topology
> > If the default implementation returned `null` and `XSupplier extends
> > StateStoreConnector`, then existing code would break, because
> > `StreamsBuilder#addStateStore()` would throw a NPE.
> >
> > +1 for opening a WIP PR
> >
> > Best,
> > Bruno
> >
> >
> > On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax 
> > wrote:
> >
> > > Thank Paul!
> > >
> > > I agree with all of that. If we think that the general design is good,
> > > refactoring a PR if we want to pick a different name should not be too
> > > much additional work (hopefully). Thus, if you want to open a WIP PR
> and
> > > we use it to nail the open details, it might help to find a good
> > > conclusion.
> > >
> > > >> 2) Default method vs new interface:
> > >
> > > This seems to be the hardest tradeoff. I see the point about
> > > discoveability... Might be good to get input from others, which version
> > > they would prefer.
> > >
> > > Just to make clear, my suggestion from the last email was, that
> > > `Transformer` etc does not extend the new interface. Instead, a user
> > > that want to use this feature would need to implement both interfaces.
> > >
> > > If `Transformer extends StoreProvider` (just picking a name here)
> > > without default implementation existing code would break and thus it
> not
> > > a an op

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-04-30 Thread Paul Whalen
I definitely don't mind anyone jumping, Bruno, thanks for the comments!

1) I'm not totally sure I'm clear on your point, but I think we're on the
same page - if we're adding a method to the XSupplier interfaces (by making
them inherit from a super interface StateStoreConnector) then we definitely
need a default implementation to maintain compatibility.  Whether the
default implementation returns null or an empty list is somewhat of a
detail.

2) If stream.process() sees that StateStoreConnector#stateStores() returns
either null or an empty list, it would handle that case specifically and
not try to call addStateStore at all.  Or is this not what you're asking?

Separately, I'm still hacking away at the details of the PR and will
continue to get something into a discussable state, but I'll share some
thoughts I've run into.

A) I'm tentatively going the separate interface route (Matthias's
suggestion) and naming it ConnectedStoreProvider.  Still don't love the
name, but there's something nice about the name indicating *why* this thing
is providing the store, not just that it is providing it.

B) It has occurred to me that topology.addProcessor() could also recognize
if ProcessorSupplier implements ConnectedStoreProvider and add and connect
stores appropriately.  This isn't in the KIP and I think the value-add is
lower (if you're reaching that low level, surely the "auto add/connect
store" isn't too important to you), but I think it would be a confusing if
it didn't, and I don't see any real downside.

Paul

On Tue, Apr 30, 2019 at 4:18 AM Bruno Cadonna  wrote:

> Hi,
>
> @Paul: Thank you for the KIP!
>
> I hope you do not mind that I jump in.
>
> I have the following comments:
>
> 1) `null` vs empty list in the default implementation
> IIUC, returning `null` in the default implementation should basically
> signal that the method `stateStores` was not overridden. Why then provide a
> default implementation in the first place? Without default implementation
> you would discover the missing implementation already at compile-time and
> not only at runtime. If you decide not to provide a default implementation,
> `XSupplier extends StateStoreConnector` would break existing code as
> Matthias has already pointed out.
>
> 2) `process` method adding the StoreBuilders to the topology
> If the default implementation returned `null` and `XSupplier extends
> StateStoreConnector`, then existing code would break, because
> `StreamsBuilder#addStateStore()` would throw a NPE.
>
> +1 for opening a WIP PR
>
> Best,
> Bruno
>
>
> On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax 
> wrote:
>
> > Thank Paul!
> >
> > I agree with all of that. If we think that the general design is good,
> > refactoring a PR if we want to pick a different name should not be too
> > much additional work (hopefully). Thus, if you want to open a WIP PR and
> > we use it to nail the open details, it might help to find a good
> > conclusion.
> >
> > >> 2) Default method vs new interface:
> >
> > This seems to be the hardest tradeoff. I see the point about
> > discoveability... Might be good to get input from others, which version
> > they would prefer.
> >
> > Just to make clear, my suggestion from the last email was, that
> > `Transformer` etc does not extend the new interface. Instead, a user
> > that want to use this feature would need to implement both interfaces.
> >
> > If `Transformer extends StoreProvider` (just picking a name here)
> > without default implementation existing code would break and thus it not
> > a an option because of breaking backward compatibility.
> >
> >
> > -Matthias
> >
> > On 4/28/19 8:37 PM, Paul Whalen wrote:
> > > Great thoughts Matthias, thanks! I think we're all agreed that naming
> and
> > > documentation/education are the biggest hurdles for this KIP, and in
> > light
> > > of that, I think it makes sense for me to just take a stab at a full
> > > fledged PR with documentation to convince us that it's possible to do
> it
> > > with enough clarity.
> > >
> > > In response to your specific thoughts:
> > >
> > > 1) StateStoreConnector as a name: Really good point about defining the
> > > difference between "adding" and "connecting."  Guozhang suggested
> > > StateStoreConnector which was definitely an improvement over my
> > > StateStoresSupplier, but I think you're right that we need to be
> careful
> > to
> > > make it clear that it's really accomplishing both.  Thinking about it
> > now,
> > > one problem with Connector is that the implementer of the interface is
> > not
> > > really doing any connecting, it's providing/supplying the store that
> will
> > > be both added and connected.  StoreProvider seems reasonable to me and
> > > probably the best candidate at the moment, but it would be nice if the
> > name
> > > could convey that it's providing the store specifically so the caller
> can
> > > add it to the topology and connect it to the associated transformer.
> > >
> > > In general I think that really calli

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-04-30 Thread Bruno Cadonna
Hi,

@Paul: Thank you for the KIP!

I hope you do not mind that I jump in.

I have the following comments:

1) `null` vs empty list in the default implementation
IIUC, returning `null` in the default implementation should basically
signal that the method `stateStores` was not overridden. Why then provide a
default implementation in the first place? Without default implementation
you would discover the missing implementation already at compile-time and
not only at runtime. If you decide not to provide a default implementation,
`XSupplier extends StateStoreConnector` would break existing code as
Matthias has already pointed out.

2) `process` method adding the StoreBuilders to the topology
If the default implementation returned `null` and `XSupplier extends
StateStoreConnector`, then existing code would break, because
`StreamsBuilder#addStateStore()` would throw a NPE.

+1 for opening a WIP PR

Best,
Bruno


On Sun, Apr 28, 2019 at 10:57 PM Matthias J. Sax 
wrote:

> Thank Paul!
>
> I agree with all of that. If we think that the general design is good,
> refactoring a PR if we want to pick a different name should not be too
> much additional work (hopefully). Thus, if you want to open a WIP PR and
> we use it to nail the open details, it might help to find a good
> conclusion.
>
> >> 2) Default method vs new interface:
>
> This seems to be the hardest tradeoff. I see the point about
> discoveability... Might be good to get input from others, which version
> they would prefer.
>
> Just to make clear, my suggestion from the last email was, that
> `Transformer` etc does not extend the new interface. Instead, a user
> that want to use this feature would need to implement both interfaces.
>
> If `Transformer extends StoreProvider` (just picking a name here)
> without default implementation existing code would break and thus it not
> a an option because of breaking backward compatibility.
>
>
> -Matthias
>
> On 4/28/19 8:37 PM, Paul Whalen wrote:
> > Great thoughts Matthias, thanks! I think we're all agreed that naming and
> > documentation/education are the biggest hurdles for this KIP, and in
> light
> > of that, I think it makes sense for me to just take a stab at a full
> > fledged PR with documentation to convince us that it's possible to do it
> > with enough clarity.
> >
> > In response to your specific thoughts:
> >
> > 1) StateStoreConnector as a name: Really good point about defining the
> > difference between "adding" and "connecting."  Guozhang suggested
> > StateStoreConnector which was definitely an improvement over my
> > StateStoresSupplier, but I think you're right that we need to be careful
> to
> > make it clear that it's really accomplishing both.  Thinking about it
> now,
> > one problem with Connector is that the implementer of the interface is
> not
> > really doing any connecting, it's providing/supplying the store that will
> > be both added and connected.  StoreProvider seems reasonable to me and
> > probably the best candidate at the moment, but it would be nice if the
> name
> > could convey that it's providing the store specifically so the caller can
> > add it to the topology and connect it to the associated transformer.
> >
> > In general I think that really calling out what "adding" versus
> > "connecting" is in the documentation will help make the entire purpose of
> > this feature more clear to the user.
> >
> > 2) Default method vs new interface: The choice of a default method was
> > influenced by Guozhang's fear about API bloat/discoverability.  I can
> > definitely see it both ways   Would the separate interface be a
> > sub-interface of Processor/TransformerSupplier or standalone?  It seems
> > like you're suggesting standalone and I think that's what I favor.  My
> only
> > concern there is that the interface wouldn't actually be a type to any
> > public API which sort of hurts discoverability.  You would have to read
> the
> > javadocs for stream.process/transform() to discover that implementing the
> > interface in addition to Processor/TransformerSupplier would add and
> > connect the store for you.  But that added burden actually probably helps
> > us in terms of making sure people don't mix and match, like you said.
> >
> > 3) Returning null instead of empty: Seems fair to me.  I always worry
> about
> > returning null when an empty collection can be used instead, but given
> that
> > the library is the caller rather than the client I think your point makes
> > sense here.
> >
> > 4) Returning Set instead of Collection: Agreed, don't see why not to make
> > it more specific.
> >
> > Paul
> >
> > On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax 
> > wrote:
> >
> >> Hi, sorry for the long pause. Just trying to catch up here.
> >>
> >> I think it save to allow `addStateStore()` to be idempotent for the same
> >> `StoreBuilder` object. In fact, the `name` is "hard coded" and thus it's
> >> not really possible to use the same `StoreBuilder` object to create
> >> different sto

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-04-28 Thread Matthias J. Sax
Thank Paul!

I agree with all of that. If we think that the general design is good,
refactoring a PR if we want to pick a different name should not be too
much additional work (hopefully). Thus, if you want to open a WIP PR and
we use it to nail the open details, it might help to find a good conclusion.

>> 2) Default method vs new interface: 

This seems to be the hardest tradeoff. I see the point about
discoveability... Might be good to get input from others, which version
they would prefer.

Just to make clear, my suggestion from the last email was, that
`Transformer` etc does not extend the new interface. Instead, a user
that want to use this feature would need to implement both interfaces.

If `Transformer extends StoreProvider` (just picking a name here)
without default implementation existing code would break and thus it not
a an option because of breaking backward compatibility.


-Matthias

On 4/28/19 8:37 PM, Paul Whalen wrote:
> Great thoughts Matthias, thanks! I think we're all agreed that naming and
> documentation/education are the biggest hurdles for this KIP, and in light
> of that, I think it makes sense for me to just take a stab at a full
> fledged PR with documentation to convince us that it's possible to do it
> with enough clarity.
> 
> In response to your specific thoughts:
> 
> 1) StateStoreConnector as a name: Really good point about defining the
> difference between "adding" and "connecting."  Guozhang suggested
> StateStoreConnector which was definitely an improvement over my
> StateStoresSupplier, but I think you're right that we need to be careful to
> make it clear that it's really accomplishing both.  Thinking about it now,
> one problem with Connector is that the implementer of the interface is not
> really doing any connecting, it's providing/supplying the store that will
> be both added and connected.  StoreProvider seems reasonable to me and
> probably the best candidate at the moment, but it would be nice if the name
> could convey that it's providing the store specifically so the caller can
> add it to the topology and connect it to the associated transformer.
> 
> In general I think that really calling out what "adding" versus
> "connecting" is in the documentation will help make the entire purpose of
> this feature more clear to the user.
> 
> 2) Default method vs new interface: The choice of a default method was
> influenced by Guozhang's fear about API bloat/discoverability.  I can
> definitely see it both ways   Would the separate interface be a
> sub-interface of Processor/TransformerSupplier or standalone?  It seems
> like you're suggesting standalone and I think that's what I favor.  My only
> concern there is that the interface wouldn't actually be a type to any
> public API which sort of hurts discoverability.  You would have to read the
> javadocs for stream.process/transform() to discover that implementing the
> interface in addition to Processor/TransformerSupplier would add and
> connect the store for you.  But that added burden actually probably helps
> us in terms of making sure people don't mix and match, like you said.
> 
> 3) Returning null instead of empty: Seems fair to me.  I always worry about
> returning null when an empty collection can be used instead, but given that
> the library is the caller rather than the client I think your point makes
> sense here.
> 
> 4) Returning Set instead of Collection: Agreed, don't see why not to make
> it more specific.
> 
> Paul
> 
> On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax 
> wrote:
> 
>> Hi, sorry for the long pause. Just trying to catch up here.
>>
>> I think it save to allow `addStateStore()` to be idempotent for the same
>> `StoreBuilder` object. In fact, the `name` is "hard coded" and thus it's
>> not really possible to use the same `StoreBuilder` object to create
>> different stores.
>>
>> I also agree with the concern, that only allowing a single store (as
>> proposed by Ivan) might be too restrictive.
>>
>> Overall, the current KIP version LGTM. I don't have mayor concerns about
>> user education for this case, but I agree that we need to document this
>> clearly.
>>
>> Some further comments:
>>
>> (1) I am not sure if `StateStoreConnector` is the best name for the new
>> interface. Note, that there are two concepts about stores:
>>
>>  - adding a store: this makes the store available in the topology in
>> general (however, the store is still "dangling", and not used)
>>  - connecting a store: this allows a processor etc to use a store
>>
>> The new interface does both, but its name only indicates that second
>> part what might be confusing. It might be especially confusing because
>> we want to disallow to mix the exiting "manually add and connect"
>> pattern, with a new pattern to "auto add+connect". If the new interface
>> name indicates the connect part only, user might think they need to add
>> stores manually and can connect automatically.
>>
>> Unfortunately, I don't have a much better su

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-04-28 Thread Paul Whalen
Great thoughts Matthias, thanks! I think we're all agreed that naming and
documentation/education are the biggest hurdles for this KIP, and in light
of that, I think it makes sense for me to just take a stab at a full
fledged PR with documentation to convince us that it's possible to do it
with enough clarity.

In response to your specific thoughts:

1) StateStoreConnector as a name: Really good point about defining the
difference between "adding" and "connecting."  Guozhang suggested
StateStoreConnector which was definitely an improvement over my
StateStoresSupplier, but I think you're right that we need to be careful to
make it clear that it's really accomplishing both.  Thinking about it now,
one problem with Connector is that the implementer of the interface is not
really doing any connecting, it's providing/supplying the store that will
be both added and connected.  StoreProvider seems reasonable to me and
probably the best candidate at the moment, but it would be nice if the name
could convey that it's providing the store specifically so the caller can
add it to the topology and connect it to the associated transformer.

In general I think that really calling out what "adding" versus
"connecting" is in the documentation will help make the entire purpose of
this feature more clear to the user.

2) Default method vs new interface: The choice of a default method was
influenced by Guozhang's fear about API bloat/discoverability.  I can
definitely see it both ways   Would the separate interface be a
sub-interface of Processor/TransformerSupplier or standalone?  It seems
like you're suggesting standalone and I think that's what I favor.  My only
concern there is that the interface wouldn't actually be a type to any
public API which sort of hurts discoverability.  You would have to read the
javadocs for stream.process/transform() to discover that implementing the
interface in addition to Processor/TransformerSupplier would add and
connect the store for you.  But that added burden actually probably helps
us in terms of making sure people don't mix and match, like you said.

3) Returning null instead of empty: Seems fair to me.  I always worry about
returning null when an empty collection can be used instead, but given that
the library is the caller rather than the client I think your point makes
sense here.

4) Returning Set instead of Collection: Agreed, don't see why not to make
it more specific.

Paul

On Fri, Apr 26, 2019 at 2:30 AM Matthias J. Sax 
wrote:

> Hi, sorry for the long pause. Just trying to catch up here.
>
> I think it save to allow `addStateStore()` to be idempotent for the same
> `StoreBuilder` object. In fact, the `name` is "hard coded" and thus it's
> not really possible to use the same `StoreBuilder` object to create
> different stores.
>
> I also agree with the concern, that only allowing a single store (as
> proposed by Ivan) might be too restrictive.
>
> Overall, the current KIP version LGTM. I don't have mayor concerns about
> user education for this case, but I agree that we need to document this
> clearly.
>
> Some further comments:
>
> (1) I am not sure if `StateStoreConnector` is the best name for the new
> interface. Note, that there are two concepts about stores:
>
>  - adding a store: this makes the store available in the topology in
> general (however, the store is still "dangling", and not used)
>  - connecting a store: this allows a processor etc to use a store
>
> The new interface does both, but its name only indicates that second
> part what might be confusing. It might be especially confusing because
> we want to disallow to mix the exiting "manually add and connect"
> pattern, with a new pattern to "auto add+connect". If the new interface
> name indicates the connect part only, user might think they need to add
> stores manually and can connect automatically.
>
> Unfortunately, I don't have a much better suggestion for a name either.
> The only idea that came to my mind was `StoreProvider`: to me, a
> provider is a "service" interface that does work for us, ie, it adds and
> connects a store. Not sure if this is too subtle, if we consider that
> there is already the `StoreSupplier` interface?
>
> But maybe somebody else might still have a good idea on how the improve
> the name.
>
> In any case, I would suggest to shorten the name to `StoreConnector`
> instead of `StateStoreConnector`, because we also have `StoreSupplier`
> and `StoreBuilder`.
>
>
>
> (2) The KIP proposes to add the new interface to `ProcessorSupplier` etc
> and to add a default implementation for the new method. Hence, user
> would need to overwrite this default implementation to op-in to the
> feature. I am wonder if it might be better to not add the new interface
> to `ProcessorSupplier` etc and to just provide a new interface with no
> default implementation. Users would opt-in by adding the interface
> explicitly to their existing `ProcessorSupplier` implementation.
> Overwriting a default metho

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-04-26 Thread Matthias J. Sax
Hi, sorry for the long pause. Just trying to catch up here.

I think it save to allow `addStateStore()` to be idempotent for the same
`StoreBuilder` object. In fact, the `name` is "hard coded" and thus it's
not really possible to use the same `StoreBuilder` object to create
different stores.

I also agree with the concern, that only allowing a single store (as
proposed by Ivan) might be too restrictive.

Overall, the current KIP version LGTM. I don't have mayor concerns about
user education for this case, but I agree that we need to document this
clearly.

Some further comments:

(1) I am not sure if `StateStoreConnector` is the best name for the new
interface. Note, that there are two concepts about stores:

 - adding a store: this makes the store available in the topology in
general (however, the store is still "dangling", and not used)
 - connecting a store: this allows a processor etc to use a store

The new interface does both, but its name only indicates that second
part what might be confusing. It might be especially confusing because
we want to disallow to mix the exiting "manually add and connect"
pattern, with a new pattern to "auto add+connect". If the new interface
name indicates the connect part only, user might think they need to add
stores manually and can connect automatically.

Unfortunately, I don't have a much better suggestion for a name either.
The only idea that came to my mind was `StoreProvider`: to me, a
provider is a "service" interface that does work for us, ie, it adds and
connects a store. Not sure if this is too subtle, if we consider that
there is already the `StoreSupplier` interface?

But maybe somebody else might still have a good idea on how the improve
the name.

In any case, I would suggest to shorten the name to `StoreConnector`
instead of `StateStoreConnector`, because we also have `StoreSupplier`
and `StoreBuilder`.



(2) The KIP proposes to add the new interface to `ProcessorSupplier` etc
and to add a default implementation for the new method. Hence, user
would need to overwrite this default implementation to op-in to the
feature. I am wonder if it might be better to not add the new interface
to `ProcessorSupplier` etc and to just provide a new interface with no
default implementation. Users would opt-in by adding the interface
explicitly to their existing `ProcessorSupplier` implementation.
Overwriting a default method and getting different behavior seems to be
a little subtle to me, especially, because we don't want to allow to
mix-and-match the old and new approaches. Think: I only overwrite a
default method and my code breaks.

Thoughts?



(3) If we keep the current default implementation for the new method, I
am wondering if it should return `null` instead of an empty collection?
This might be saver to detect bugs in user code for which, per accident,
an empty collection could be returned.



(4) Should the new method return a `Set` instead of a `Collection` to
indicate the semantics clearly (ie, returning the same `StoreBuilder`
multiple times is idempotent and one cannot add+connect to it twice).



-Matthias




On 4/6/19 12:27 PM, Paul Whalen wrote:
> Ivan and Guozhang,
> 
> Thanks for the thoughts!  Ivan's use case is definitely interesting.  The
> way I see it, if we can achieve the main goal of the KIP (allowing
> Processor/TransformerSuppliers to encapsulate their usage of state stores),
> we will enable this kind of thing in "user space" very easily.
> 
> I will say that I'm not totally sure that most use cases of transform() use
> just one state store.  It's hard to know since I haven't seen many examples
> in public, but my team's usages almost exclusively require multiple state
> stores.  We only reach for the low level processor API when we need that
> complexity, and it's somewhat hard to imagine many use cases that only need
> one state store, since the high level DSL can usually accomplish those
> tasks.  The example Ivan presented for instance looks like a
> stream.groupByKey().reduce(...) to me.  Ivan, I'd be curious what sort of
> other usages you're imagining.
> 
> That being said, perhaps the Processor API should really just be considered
> a separate paradigm in Streams, not just a lower level that we reach to
> when necessary.  In which case it would be beneficial to make the simple
> use cases easier.  I've definitely talked about this with my own team - if
> you're less familiar with the kind of functional style that the high level
> DSL offers, it might be easier to "see" your state and interact with it
> directly.
> 
> Anyway, I've updated the KIP to reflect my current PR with Guozhang's
> suggestions.  It seems like there is at least some interest in that on its
> own and not a ton of pushback, so I think I will try to start a vote.
> 
> Paul
> 
> On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev  wrote:
> 
>> Hi all!
>>
>> I was about to write another KIP, but found out that KIP-401 addresses
>> exactly the problem I faced. So let m

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-04-06 Thread Paul Whalen
Ivan and Guozhang,

Thanks for the thoughts!  Ivan's use case is definitely interesting.  The
way I see it, if we can achieve the main goal of the KIP (allowing
Processor/TransformerSuppliers to encapsulate their usage of state stores),
we will enable this kind of thing in "user space" very easily.

I will say that I'm not totally sure that most use cases of transform() use
just one state store.  It's hard to know since I haven't seen many examples
in public, but my team's usages almost exclusively require multiple state
stores.  We only reach for the low level processor API when we need that
complexity, and it's somewhat hard to imagine many use cases that only need
one state store, since the high level DSL can usually accomplish those
tasks.  The example Ivan presented for instance looks like a
stream.groupByKey().reduce(...) to me.  Ivan, I'd be curious what sort of
other usages you're imagining.

That being said, perhaps the Processor API should really just be considered
a separate paradigm in Streams, not just a lower level that we reach to
when necessary.  In which case it would be beneficial to make the simple
use cases easier.  I've definitely talked about this with my own team - if
you're less familiar with the kind of functional style that the high level
DSL offers, it might be easier to "see" your state and interact with it
directly.

Anyway, I've updated the KIP to reflect my current PR with Guozhang's
suggestions.  It seems like there is at least some interest in that on its
own and not a ton of pushback, so I think I will try to start a vote.

Paul

On Sat, Mar 30, 2019 at 10:03 AM Ivan Ponomarev  wrote:

> Hi all!
>
> I was about to write another KIP, but found out that KIP-401 addresses
> exactly the problem I faced. So let me jump into your discussion and ask
> you to assess another idea.
>
> I fully agree with the KIP-401's motivation part. E. g in my project I had
> to invent a wrapper class that hides the details of KeyValueStore
> management from business logic. Of course this should be done better in
> KStreams API.
>
> But I was about to look at this problem from another side and propose a
> simple alternative in high-level DSL, that will not fit all the cases, but
> most of them. Hence my idea does not exclude the Paul's proposal.
>
> What if we restrict ourselves to *only one* KeyValueStore and propose a
> method that resembles  `aggregate` and `reduce` methods, like this:
>
> stream
>.map(...)
>.filter(...)
>.transform ((k, v, s)->{}, Transformed.with())
>
> where
> * k, v -- input key & value
> * s -- a KeyValueStore provided as an argument
> * return value of the lambda should be KeyValue.pair(...)
> * Transformed.with... is a builder, used in order to define the
> Transformer and KeyValueStore building parameters. Some of these parameters
> should be:
> ** store's KeySerde,
> ** store's ValueSerde,
> ** whether the store is persistent or in-memory,
> ** store's name -- optional parameter, the system should be able to devise
> the name of the store transparently for the user, if we don't want to
> devise it ourselves/share the store between processors.
> ** scheduled punctuation.
>
> Imagine we have a KStream, and we need to calculate a
> `derivative` stream, that is, a stream of 'deltas' of the provided integer
> values.
>
> This could be achieved as simple as
>
> stream.transform((key, value, stateStore) -> {
> int previousValue =
> Optional.ofNullable(stateStore.get(key)).orElse(0);
> stateStore.put(key, value);
> return KeyValue.pair(key, value - previousValue);
> }
> //we do not need to bother with store name, punctuation etc.
> //may be even Serde part can be omitted, since we can inherit the
> serdes from stream by default
> , Transformed.with(Serdes.String(), Serdes.Integer())
> }
>
> The hard part of it is that new `transform` method definition should be
> parameterized by six type parameters:
>
> * input/output/KeyValueStore key type,
> * input/output/KeyValueStore value type.
>
> However, it seems that all these types can be inferred from the provided
> lambda and Transformed.with instances.
>
> What do you think about this?
>
> Regards,
>
> Ivan
>
>
> 27.03.2019 20:45, Guozhang Wang пишет:
>
> Hello Paul,
>
> Thanks for the uploaded PR and the detailed description! I've made a pass
> on it and left some comments.
>
> Overall I think I agree with you that passing in the storebuilder directly
> that store name is more convienent as it does not require another
> `addStore` call, but we just need to spend some more documentation effort
> on educating users about the two ways of connecting their stores. I'm
> slightly concerned about this education curve but I can be convinced if
> most people felt it is worthy.
>
>
> Guozhang
>
> On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen  
>  wrote:
>
>
> I'd like to resurrect this discussion with a cursory, proof-of-concept
> implementation of the KIP which co

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-03-30 Thread Ivan Ponomarev

Hi all!

I was about to write another KIP, but found out that KIP-401 addresses 
exactly the problem I faced. So let me jump into your discussion and ask 
you to assess another idea.


I fully agree with the KIP-401's motivation part. E. g in my project I 
had to invent a wrapper class that hides the details of KeyValueStore 
management from business logic. Of course this should be done better in 
KStreams API.


But I was about to look at this problem from another side and propose a 
simple alternative in high-level DSL, that will not fit all the cases, 
but  most of them. Hence my idea does not exclude the Paul's proposal.


What if we restrict ourselves to *only one* KeyValueStore and propose a 
method that resembles `aggregate` and `reduce` methods, like this:


stream
   .map(...)
   .filter(...)
   .transform ((k, v, s)->{}, Transformed.with())

where
* k, v -- input key & value
* s -- a KeyValueStore provided as an argument
* return value of the lambda should be KeyValue.pair(...)
* Transformed.with... is a builder, used in order to define the 
Transformer and KeyValueStore building parameters. Some of these 
parameters should be:

** store's KeySerde,
** store's ValueSerde,
** whether the store is persistent or in-memory,
** store's name -- optional parameter, the system should be able to 
devise the name of the store transparently for the user, if we don't 
want to devise it ourselves/share the store between processors.

** scheduled punctuation.

Imagine we have a KStream, and we need to calculate a 
`derivative` stream, that is, a stream of 'deltas' of the provided 
integer values.


This could be achieved as simple as

stream.transform((key, value, stateStore) -> {
    int previousValue = 
Optional.ofNullable(stateStore.get(key)).orElse(0);

    stateStore.put(key, value);
    return KeyValue.pair(key, value - previousValue);
    }
    //we do not need to bother with store name, punctuation etc.
    //may be even Serde part can be omitted, since we can inherit 
the serdes from stream by default

    , Transformed.with(Serdes.String(), Serdes.Integer())
}

The hard part of it is that new `transform` method definition should be 
parameterized by six type parameters:


* input/output/KeyValueStore key type,
* input/output/KeyValueStore value type.

However, it seems that all these types can be inferred from the provided 
lambda and Transformed.with instances.


What do you think about this?

Regards,

Ivan


27.03.2019 20:45, Guozhang Wang пишет:

Hello Paul,

Thanks for the uploaded PR and the detailed description! I've made a pass
on it and left some comments.

Overall I think I agree with you that passing in the storebuilder directly
that store name is more convienent as it does not require another
`addStore` call, but we just need to spend some more documentation effort
on educating users about the two ways of connecting their stores. I'm
slightly concerned about this education curve but I can be convinced if
most people felt it is worthy.


Guozhang

On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen  wrote:


I'd like to resurrect this discussion with a cursory, proof-of-concept
implementation of the KIP which combines many of our ideas:
https://github.com/apache/kafka/pull/6496.  I tried to keep the diff as
small as possible for now, just using it to convey the main ideas.  But
I'll separately address some of our earlier discussion:

- Will there be a new, separate interface for users to implement for the
new functionality? No, to hopefully keep things simple, all of the
Processor/TransformerSupplier interfaces will just extend
StateStoresSupplier, allowing users to opt in to this functionality by
overriding the default implementation that gives an empty list.
- Will the interface allow users to specify the store name, or the
entire StoreBuilder? The entire StoreBuilder, so the
Processor/TransformerSupplier can completely encapsulate name and
implementation of a state store if desired.
- Will the old way of specifying store names alongside the supplier when
calling stream.process/transform() be deprecated? No, this is still a
legitimate way to wire up Processors/Transformers and their stores. But
I
would recommend not allowing stream.process/transform() calls that use
both
store declaration mechanisms (this restriction is not in the proof of
concept)
- How will we handle adding the same state store to the topology
multiple times because different Processor/TransformerSuppliers declare
it?
topology.addStateStore() will be slightly relaxed for convenience, and
will
allow adding the same StoreBuilder multiple times as long as the exact
same
StoreBuilder instance is being added for the same store name.  This
seems
to prevent in practice the issue of accidentally making two state stores
one by adding with the same name.  For additional safety, if we wanted
to
(not in the proo

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-03-27 Thread Guozhang Wang
Hello Paul,

Thanks for the uploaded PR and the detailed description! I've made a pass
on it and left some comments.

Overall I think I agree with you that passing in the storebuilder directly
that store name is more convienent as it does not require another
`addStore` call, but we just need to spend some more documentation effort
on educating users about the two ways of connecting their stores. I'm
slightly concerned about this education curve but I can be convinced if
most people felt it is worthy.


Guozhang

On Sat, Mar 23, 2019 at 5:15 PM Paul Whalen  wrote:

> I'd like to resurrect this discussion with a cursory, proof-of-concept
> implementation of the KIP which combines many of our ideas:
> https://github.com/apache/kafka/pull/6496.  I tried to keep the diff as
> small as possible for now, just using it to convey the main ideas.  But
> I'll separately address some of our earlier discussion:
>
>- Will there be a new, separate interface for users to implement for the
>new functionality? No, to hopefully keep things simple, all of the
>Processor/TransformerSupplier interfaces will just extend
>StateStoresSupplier, allowing users to opt in to this functionality by
>overriding the default implementation that gives an empty list.
>- Will the interface allow users to specify the store name, or the
>entire StoreBuilder? The entire StoreBuilder, so the
>Processor/TransformerSupplier can completely encapsulate name and
>implementation of a state store if desired.
>- Will the old way of specifying store names alongside the supplier when
>calling stream.process/transform() be deprecated? No, this is still a
>legitimate way to wire up Processors/Transformers and their stores. But
> I
>would recommend not allowing stream.process/transform() calls that use
> both
>store declaration mechanisms (this restriction is not in the proof of
>concept)
>- How will we handle adding the same state store to the topology
>multiple times because different Processor/TransformerSuppliers declare
> it?
>topology.addStateStore() will be slightly relaxed for convenience, and
> will
>allow adding the same StoreBuilder multiple times as long as the exact
> same
>StoreBuilder instance is being added for the same store name.  This
> seems
>to prevent in practice the issue of accidentally making two state stores
>one by adding with the same name.  For additional safety, if we wanted
> to
>(not in the proof of concept), we could allow for this relaxation only
> for
>internal callers of topology.addStateStore().
>
> So, in summary, the use cases look like:
>
>- 1 transformer/processor that owns its store: Using the new
>StateStoresSupplier interface method to supply its StoreBuilders that
> will
>be added to the topology automatically.
>- Multiple transformer/processors that share the same store: Either
>
>
>1. The old way: the StoreBuilder is defined "far away" from the
>Transformer/Processor implementations, and is added to the topology
>manually by the user
>2. The new way: the StoreBuilder is defined closer to the
>Transformer/Processor implementations, and the same instance is
> returned by
>all Transformer/ProcessorSuppliers that need it
>
>
> This makes the KIP wiki a bit stale; I'll update if we want to bring this
> design to a vote.
>
> Thanks!
> Paul
>
> On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang  wrote:
>
> > Matthias / Paul,
> >
> > The concern I had about introducing `StoreBuilderSupplier` is simply
> > because it is another XXSupplier to the public API, so I'd like to ask if
> > we really have to add it :)
> >
> > The difference between encapsulating the store name and encapsulating the
> > full state store builder is that, in the former:
> >
> > ---
> >
> > String storeName = "store1";
> > builder.addStore(new MyStoreBuilder(storeName));
> > stream1.transform(new MyTransformerSupplier(storeName));   // following
> my
> > proposal, that the store name can be passed in and used for both
> > `listStores` and in the `Transformer#init`; so the Transformer function
> > does not need to get the constant string name again.
> >
> >  // one caveat to admit, is that
> > MyTransofmerSupplier logic may be just unique to `store1` so it cannot be
> > reused with a different store name anyways.
> > ---
> >
> > While in the latter:
> >
> > ---
> >
> > stream1.transform(new MyTransformerSupplierForStore1);   // the name is
> > just indicating that we may have one such supplier for each store.
> >
> > ---
> >
> > I understand the latter introduce more convenience from the API, but the
> > cost is that since we still cannot completely `builder.addStore`, but
> only
> > reduce its semantic scope to shared state stores only,; hence users need
> to
> > learn two ways of creating state stores for those two patterns.
> >
> > My argument is that more public A

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2019-03-23 Thread Paul Whalen
I'd like to resurrect this discussion with a cursory, proof-of-concept
implementation of the KIP which combines many of our ideas:
https://github.com/apache/kafka/pull/6496.  I tried to keep the diff as
small as possible for now, just using it to convey the main ideas.  But
I'll separately address some of our earlier discussion:

   - Will there be a new, separate interface for users to implement for the
   new functionality? No, to hopefully keep things simple, all of the
   Processor/TransformerSupplier interfaces will just extend
   StateStoresSupplier, allowing users to opt in to this functionality by
   overriding the default implementation that gives an empty list.
   - Will the interface allow users to specify the store name, or the
   entire StoreBuilder? The entire StoreBuilder, so the
   Processor/TransformerSupplier can completely encapsulate name and
   implementation of a state store if desired.
   - Will the old way of specifying store names alongside the supplier when
   calling stream.process/transform() be deprecated? No, this is still a
   legitimate way to wire up Processors/Transformers and their stores. But I
   would recommend not allowing stream.process/transform() calls that use both
   store declaration mechanisms (this restriction is not in the proof of
   concept)
   - How will we handle adding the same state store to the topology
   multiple times because different Processor/TransformerSuppliers declare it?
   topology.addStateStore() will be slightly relaxed for convenience, and will
   allow adding the same StoreBuilder multiple times as long as the exact same
   StoreBuilder instance is being added for the same store name.  This seems
   to prevent in practice the issue of accidentally making two state stores
   one by adding with the same name.  For additional safety, if we wanted to
   (not in the proof of concept), we could allow for this relaxation only for
   internal callers of topology.addStateStore().

So, in summary, the use cases look like:

   - 1 transformer/processor that owns its store: Using the new
   StateStoresSupplier interface method to supply its StoreBuilders that will
   be added to the topology automatically.
   - Multiple transformer/processors that share the same store: Either


   1. The old way: the StoreBuilder is defined "far away" from the
   Transformer/Processor implementations, and is added to the topology
   manually by the user
   2. The new way: the StoreBuilder is defined closer to the
   Transformer/Processor implementations, and the same instance is returned by
   all Transformer/ProcessorSuppliers that need it


This makes the KIP wiki a bit stale; I'll update if we want to bring this
design to a vote.

Thanks!
Paul

On Sun, Dec 16, 2018 at 6:04 PM Guozhang Wang  wrote:

> Matthias / Paul,
>
> The concern I had about introducing `StoreBuilderSupplier` is simply
> because it is another XXSupplier to the public API, so I'd like to ask if
> we really have to add it :)
>
> The difference between encapsulating the store name and encapsulating the
> full state store builder is that, in the former:
>
> ---
>
> String storeName = "store1";
> builder.addStore(new MyStoreBuilder(storeName));
> stream1.transform(new MyTransformerSupplier(storeName));   // following my
> proposal, that the store name can be passed in and used for both
> `listStores` and in the `Transformer#init`; so the Transformer function
> does not need to get the constant string name again.
>
>  // one caveat to admit, is that
> MyTransofmerSupplier logic may be just unique to `store1` so it cannot be
> reused with a different store name anyways.
> ---
>
> While in the latter:
>
> ---
>
> stream1.transform(new MyTransformerSupplierForStore1);   // the name is
> just indicating that we may have one such supplier for each store.
>
> ---
>
> I understand the latter introduce more convenience from the API, but the
> cost is that since we still cannot completely `builder.addStore`, but only
> reduce its semantic scope to shared state stores only,; hence users need to
> learn two ways of creating state stores for those two patterns.
>
> My argument is that more public APIs requires longer learning curve for
> users, and introduces more usage patterns that may confuse users (the
> proposal I had tries to replace one with another completely).
>
>
> Guozhang
>
> On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen  wrote:
>
> > Thanks for the great thoughts Matthias and Guozhang!
> >
> > If I'm not mistaken, Guozhang's suggestion is what my second alternative
> on
> > the KIP is ("Have the added method on the Supplier interfaces only return
> > store names, not builders").  I do think it would be a worthwhile
> usability
> > improvement on its own, but to Matthias's point, it doesn't achieve the
> > full goal of completing encapsulating a state store and it's processor -
> it
> > encapsulates the name, but not the StateStoreBuilder.
> >
> 

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2018-12-16 Thread Guozhang Wang
Matthias / Paul,

The concern I had about introducing `StoreBuilderSupplier` is simply
because it is another XXSupplier to the public API, so I'd like to ask if
we really have to add it :)

The difference between encapsulating the store name and encapsulating the
full state store builder is that, in the former:

---

String storeName = "store1";
builder.addStore(new MyStoreBuilder(storeName));
stream1.transform(new MyTransformerSupplier(storeName));   // following my
proposal, that the store name can be passed in and used for both
`listStores` and in the `Transformer#init`; so the Transformer function
does not need to get the constant string name again.

 // one caveat to admit, is that
MyTransofmerSupplier logic may be just unique to `store1` so it cannot be
reused with a different store name anyways.
---

While in the latter:

---

stream1.transform(new MyTransformerSupplierForStore1);   // the name is
just indicating that we may have one such supplier for each store.

---

I understand the latter introduce more convenience from the API, but the
cost is that since we still cannot completely `builder.addStore`, but only
reduce its semantic scope to shared state stores only,; hence users need to
learn two ways of creating state stores for those two patterns.

My argument is that more public APIs requires longer learning curve for
users, and introduces more usage patterns that may confuse users (the
proposal I had tries to replace one with another completely).


Guozhang

On Sun, Dec 16, 2018 at 2:58 PM Paul Whalen  wrote:

> Thanks for the great thoughts Matthias and Guozhang!
>
> If I'm not mistaken, Guozhang's suggestion is what my second alternative on
> the KIP is ("Have the added method on the Supplier interfaces only return
> store names, not builders").  I do think it would be a worthwhile usability
> improvement on its own, but to Matthias's point, it doesn't achieve the
> full goal of completing encapsulating a state store and it's processor - it
> encapsulates the name, but not the StateStoreBuilder.
>
> I'm really intrigued by Matthias's idea that forgoes the default interface
> method I proposed.  Having smaller, separate interfaces is a powerful idea
> and I think a cleaner API than what I proposed.  The non-shared store use
> case is handled well here, and the shared store use case is possible,
> though maybe still not as graceful as we would like (having to add the
> StoreBuilderSupplier before the StoreNameSupplier seems maybe too subtle to
> me).
>
> We're all agreed that one of the big problems with the shared store use
> case is how to deal with adding the same store to the topology multiple
> times.  Catching the "store already added" exception is risky.  Here's a
> maybe radical idea: change `topology.addStateStore()` to be idempotent for
> adding a given state store name and `StoreBuilder`.  In other words,
> `addStateStore` would not throw the "store already added" exception if the
> `StoreBuilder` being added for a given name has the same identity as the
> one that has already been added.  Does this eliminate all the bugs we're
> worried about?  Thinking about it for a few minutes, it seems to eliminate
> most at least (would a user really use the exact same StoreBuilder when
> they intend there to be two stores?).  It might make the API slightly
> harder to use if a user isn't immediately aware of that subtlety, but a
> good error message should ease the pain, and it would happen immediately
> during development.
>
> And with regards to Matthias's comment about whether we need to deprecate
> existing varargs transform methods - I don't think we need to, but it might
> be nice for there only to be one way to do things, assuming whatever we
> come up with supports all existing use cases.  I don't feel strongly about
> this, but if we don't deprecate, I do think it's important to add checks
> that prevent users from trying to do the same thing in two different ways,
> as we've discussed.
>
> Paul
>
> On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax 
> wrote:
>
> > Guozhang,
> >
> > >> Regarding the last option to catch "store exist already" exception and
> > >> fallback to connect stores, I'm a bit concerned it may be hiding
> actual
> > >> user bugs.
> >
> > I agree with this concern. From my original email:
> >
> > > The only disadvantage I see, might be
> > > potential bugs about sharing state if two different stores are named
> the
> > > same by mistake (this would not be detected).
> >
> >
> > For your new proposal: I am not sure if it addresses Paul's original
> > idea -- I hope Paul can clarify. From my understanding, the idea was to
> > encapsulate a store and its processor. As many stores are not shared,
> > this seems to be quite useful. Your proposal falls a little short to
> > support encapsulation for none-shared stores.
> >
> >
> > -Matthias
> >
> >
> >
> > On 12/15/18 1:40 AM, Guozhang Wang wrote:
> > > Matthias,
> 

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2018-12-16 Thread Paul Whalen
Thanks for the great thoughts Matthias and Guozhang!

If I'm not mistaken, Guozhang's suggestion is what my second alternative on
the KIP is ("Have the added method on the Supplier interfaces only return
store names, not builders").  I do think it would be a worthwhile usability
improvement on its own, but to Matthias's point, it doesn't achieve the
full goal of completing encapsulating a state store and it's processor - it
encapsulates the name, but not the StateStoreBuilder.

I'm really intrigued by Matthias's idea that forgoes the default interface
method I proposed.  Having smaller, separate interfaces is a powerful idea
and I think a cleaner API than what I proposed.  The non-shared store use
case is handled well here, and the shared store use case is possible,
though maybe still not as graceful as we would like (having to add the
StoreBuilderSupplier before the StoreNameSupplier seems maybe too subtle to
me).

We're all agreed that one of the big problems with the shared store use
case is how to deal with adding the same store to the topology multiple
times.  Catching the "store already added" exception is risky.  Here's a
maybe radical idea: change `topology.addStateStore()` to be idempotent for
adding a given state store name and `StoreBuilder`.  In other words,
`addStateStore` would not throw the "store already added" exception if the
`StoreBuilder` being added for a given name has the same identity as the
one that has already been added.  Does this eliminate all the bugs we're
worried about?  Thinking about it for a few minutes, it seems to eliminate
most at least (would a user really use the exact same StoreBuilder when
they intend there to be two stores?).  It might make the API slightly
harder to use if a user isn't immediately aware of that subtlety, but a
good error message should ease the pain, and it would happen immediately
during development.

And with regards to Matthias's comment about whether we need to deprecate
existing varargs transform methods - I don't think we need to, but it might
be nice for there only to be one way to do things, assuming whatever we
come up with supports all existing use cases.  I don't feel strongly about
this, but if we don't deprecate, I do think it's important to add checks
that prevent users from trying to do the same thing in two different ways,
as we've discussed.

Paul

On Sun, Dec 16, 2018 at 5:36 AM Matthias J. Sax 
wrote:

> Guozhang,
>
> >> Regarding the last option to catch "store exist already" exception and
> >> fallback to connect stores, I'm a bit concerned it may be hiding actual
> >> user bugs.
>
> I agree with this concern. From my original email:
>
> > The only disadvantage I see, might be
> > potential bugs about sharing state if two different stores are named the
> > same by mistake (this would not be detected).
>
>
> For your new proposal: I am not sure if it addresses Paul's original
> idea -- I hope Paul can clarify. From my understanding, the idea was to
> encapsulate a store and its processor. As many stores are not shared,
> this seems to be quite useful. Your proposal falls a little short to
> support encapsulation for none-shared stores.
>
>
> -Matthias
>
>
>
> On 12/15/18 1:40 AM, Guozhang Wang wrote:
> > Matthias,
> >
> > Thanks for your feedbacks.
> >
> > Regarding the last option to catch "store exist already" exception and
> > fallback to connect stores, I'm a bit concerned it may be hiding actual
> > user bugs.
> >
> > Thinking about Paul's proposal and your suggestion again, I'd like to
> > propose another alternative somewhere in the middle of your approaches,
> > i.e. we still let users to create sharable state stores via
> > `addStateStore`, and we allow the TransformerSupplier to return a list of
> > state stores that it needs, i.e.:
> >
> > public interface TransformerSupplier {
> > Transformer get();
> > default List stateStoreNames() {
> > return Collections.emptyList();
> > 
> > }
> > }
> >
> > by doing this users can still "consolidate" the references of store names
> > in a single place in the transform call, e.g.:
> >
> > public class MyTransformerSupplier {
> > private String storeName;
> >
> > public class MyTransformer {
> >
> >
> >
> >init() {
> >   store = context.getStateStore(storeName);
> >}
> > }
> >
> > default List stateStoreNames() {
> > return Collections.singletonList(storeName);
> > 
> > }
> > }
> >
> > Basically, we move the parameters from the caller of `transform` to
> inside
> > the TransformSuppliers. DSL implementations would not change much, simply
> > calling `connectStateStore` by getting the list of names from the
> provided
> > function.
> >
> > Guozhang
> >
> >
> > On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax 
> > wrote:
> >
> >> Just a meta comment: do we really ne

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2018-12-16 Thread Matthias J. Sax
Guozhang,

>> Regarding the last option to catch "store exist already" exception and
>> fallback to connect stores, I'm a bit concerned it may be hiding actual
>> user bugs.

I agree with this concern. From my original email:

> The only disadvantage I see, might be
> potential bugs about sharing state if two different stores are named the
> same by mistake (this would not be detected).


For your new proposal: I am not sure if it addresses Paul's original
idea -- I hope Paul can clarify. From my understanding, the idea was to
encapsulate a store and its processor. As many stores are not shared,
this seems to be quite useful. Your proposal falls a little short to
support encapsulation for none-shared stores.


-Matthias



On 12/15/18 1:40 AM, Guozhang Wang wrote:
> Matthias,
> 
> Thanks for your feedbacks.
> 
> Regarding the last option to catch "store exist already" exception and
> fallback to connect stores, I'm a bit concerned it may be hiding actual
> user bugs.
> 
> Thinking about Paul's proposal and your suggestion again, I'd like to
> propose another alternative somewhere in the middle of your approaches,
> i.e. we still let users to create sharable state stores via
> `addStateStore`, and we allow the TransformerSupplier to return a list of
> state stores that it needs, i.e.:
> 
> public interface TransformerSupplier {
> Transformer get();
> default List stateStoreNames() {
> return Collections.emptyList();
> 
> }
> }
> 
> by doing this users can still "consolidate" the references of store names
> in a single place in the transform call, e.g.:
> 
> public class MyTransformerSupplier {
> private String storeName;
> 
> public class MyTransformer {
> 
>
> 
>init() {
>   store = context.getStateStore(storeName);
>}
> }
> 
> default List stateStoreNames() {
> return Collections.singletonList(storeName);
> 
> }
> }
> 
> Basically, we move the parameters from the caller of `transform` to inside
> the TransformSuppliers. DSL implementations would not change much, simply
> calling `connectStateStore` by getting the list of names from the provided
> function.
> 
> Guozhang
> 
> 
> On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax 
> wrote:
> 
>> Just a meta comment: do we really need to deprecate existing
>> `transform()` etc methods?
>>
>> The last argument is a vararg, and thus, just keeping the existing API
>> for this part seems to work too, allowing to implement both patterns?
>>
>> Also, instead of adding a default method, we could also add a new
>> interface `StoreBuilderSupplier` with method `List
>> stateStores()` -- users could implement `TransformerSupplier` and
>> `StoreBuilderSupplier` at once; and for this case, we require that users
>> don't provide store name in `transform()`.
>>
>> Similar, we could add an interface `StoreNameSupplier` with method
>> `List stateStores()`. This allows to "auto-wire" a transformer
>> to existing stores (to avoid the issue to add the same store multiple
>> times).
>>
>> Hence, for shared stores, there would be one "main" transformer that
>> implements `StoreBuilderSupplier` and that must be added first to the
>> topology. The other transformers would implement `StoreNameSupplier` and
>> just connect to those stores.
>>
>> Another possibility to avoid the issue of adding the same stores
>> multiple times would be, that the DSL always calls `addStateStore()` but
>> catches a potential "store exists already" exception and falls back to
>> `connectProcessorAndStateStore()` for this case. Thus, we would not need
>> the `StoreNameSupplier` interface and the order in which transformers
>> are added would not matter either. The only disadvantage I see, might be
>> potential bugs about sharing state if two different stores are named the
>> same by mistake (this would not be detected).
>>
>>
>>
>> Just some ideas I wanted to share. What do you think?
>>
>>
>>
>> -Matthias
>>
>> On 12/11/18 3:46 AM, Paul Whalen wrote:
>>> Ah yes of course, this was an oversight, I completely ignored the
>> multiple
>>> processors sharing the same state store when writing up the KIP.  Which
>> is
>>> funny, because I've actually done this (different processors sharing
>> state
>>> stores) a fair amount myself, and I've settled on a pattern where I group
>>> the Processors in an enclosing class, and that enclosing class handles as
>>> much as possible.  Here's a gist showing the rough structure, just for
>>> context:
>> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
>>> . Note how it adds the stores to the topology, as well as providing a
>>> public method with the store names.
>>>
>>> I don't think my proposal completely conflicts with the multiple
>> processors
>>> sharing state stores use case, since you can create a supplier that
>>> provides the store n

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2018-12-14 Thread Guozhang Wang
Matthias,

Thanks for your feedbacks.

Regarding the last option to catch "store exist already" exception and
fallback to connect stores, I'm a bit concerned it may be hiding actual
user bugs.

Thinking about Paul's proposal and your suggestion again, I'd like to
propose another alternative somewhere in the middle of your approaches,
i.e. we still let users to create sharable state stores via
`addStateStore`, and we allow the TransformerSupplier to return a list of
state stores that it needs, i.e.:

public interface TransformerSupplier {
Transformer get();
default List stateStoreNames() {
return Collections.emptyList();

}
}

by doing this users can still "consolidate" the references of store names
in a single place in the transform call, e.g.:

public class MyTransformerSupplier {
private String storeName;

public class MyTransformer {

   

   init() {
  store = context.getStateStore(storeName);
   }
}

default List stateStoreNames() {
return Collections.singletonList(storeName);

}
}

Basically, we move the parameters from the caller of `transform` to inside
the TransformSuppliers. DSL implementations would not change much, simply
calling `connectStateStore` by getting the list of names from the provided
function.

Guozhang


On Thu, Dec 13, 2018 at 7:27 AM Matthias J. Sax 
wrote:

> Just a meta comment: do we really need to deprecate existing
> `transform()` etc methods?
>
> The last argument is a vararg, and thus, just keeping the existing API
> for this part seems to work too, allowing to implement both patterns?
>
> Also, instead of adding a default method, we could also add a new
> interface `StoreBuilderSupplier` with method `List
> stateStores()` -- users could implement `TransformerSupplier` and
> `StoreBuilderSupplier` at once; and for this case, we require that users
> don't provide store name in `transform()`.
>
> Similar, we could add an interface `StoreNameSupplier` with method
> `List stateStores()`. This allows to "auto-wire" a transformer
> to existing stores (to avoid the issue to add the same store multiple
> times).
>
> Hence, for shared stores, there would be one "main" transformer that
> implements `StoreBuilderSupplier` and that must be added first to the
> topology. The other transformers would implement `StoreNameSupplier` and
> just connect to those stores.
>
> Another possibility to avoid the issue of adding the same stores
> multiple times would be, that the DSL always calls `addStateStore()` but
> catches a potential "store exists already" exception and falls back to
> `connectProcessorAndStateStore()` for this case. Thus, we would not need
> the `StoreNameSupplier` interface and the order in which transformers
> are added would not matter either. The only disadvantage I see, might be
> potential bugs about sharing state if two different stores are named the
> same by mistake (this would not be detected).
>
>
>
> Just some ideas I wanted to share. What do you think?
>
>
>
> -Matthias
>
> On 12/11/18 3:46 AM, Paul Whalen wrote:
> > Ah yes of course, this was an oversight, I completely ignored the
> multiple
> > processors sharing the same state store when writing up the KIP.  Which
> is
> > funny, because I've actually done this (different processors sharing
> state
> > stores) a fair amount myself, and I've settled on a pattern where I group
> > the Processors in an enclosing class, and that enclosing class handles as
> > much as possible.  Here's a gist showing the rough structure, just for
> > context:
> https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
> > . Note how it adds the stores to the topology, as well as providing a
> > public method with the store names.
> >
> > I don't think my proposal completely conflicts with the multiple
> processors
> > sharing state stores use case, since you can create a supplier that
> > provides the store name you want, somewhat independently of your actual
> > Processor logic.  The issue I do see though, is that
> > topology.addStateStore() can only be called once for a given store.  So
> for
> > your example, if the there was a single TransformerSupplier that was
> passed
> > into both transform() calls, "store1" would be added (under the hood) to
> > the topology twice, which is no good.
> >
> > Perhaps this suggests that one of my alternatives on the KIP might be
> > desirable: either not having the suppliers return StoreBuilders (just
> store
> > names), or not deprecating the old methods that take "String...
> > stateStoreNames". I'll have to think about it a bit.
> >
> > Paul
> >
> > On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang 
> wrote:
> >
> >> Hello Paul,
> >>
> >> Thanks for the great writeup (very detailed and crystal motivation
> >> sections!).
> >>
> >> This is quite an interesting idea and I do like t

Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2018-12-13 Thread Matthias J. Sax
Just a meta comment: do we really need to deprecate existing
`transform()` etc methods?

The last argument is a vararg, and thus, just keeping the existing API
for this part seems to work too, allowing to implement both patterns?

Also, instead of adding a default method, we could also add a new
interface `StoreBuilderSupplier` with method `List
stateStores()` -- users could implement `TransformerSupplier` and
`StoreBuilderSupplier` at once; and for this case, we require that users
don't provide store name in `transform()`.

Similar, we could add an interface `StoreNameSupplier` with method
`List stateStores()`. This allows to "auto-wire" a transformer
to existing stores (to avoid the issue to add the same store multiple
times).

Hence, for shared stores, there would be one "main" transformer that
implements `StoreBuilderSupplier` and that must be added first to the
topology. The other transformers would implement `StoreNameSupplier` and
just connect to those stores.

Another possibility to avoid the issue of adding the same stores
multiple times would be, that the DSL always calls `addStateStore()` but
catches a potential "store exists already" exception and falls back to
`connectProcessorAndStateStore()` for this case. Thus, we would not need
the `StoreNameSupplier` interface and the order in which transformers
are added would not matter either. The only disadvantage I see, might be
potential bugs about sharing state if two different stores are named the
same by mistake (this would not be detected).



Just some ideas I wanted to share. What do you think?



-Matthias

On 12/11/18 3:46 AM, Paul Whalen wrote:
> Ah yes of course, this was an oversight, I completely ignored the multiple
> processors sharing the same state store when writing up the KIP.  Which is
> funny, because I've actually done this (different processors sharing state
> stores) a fair amount myself, and I've settled on a pattern where I group
> the Processors in an enclosing class, and that enclosing class handles as
> much as possible.  Here's a gist showing the rough structure, just for
> context: https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
> . Note how it adds the stores to the topology, as well as providing a
> public method with the store names.
> 
> I don't think my proposal completely conflicts with the multiple processors
> sharing state stores use case, since you can create a supplier that
> provides the store name you want, somewhat independently of your actual
> Processor logic.  The issue I do see though, is that
> topology.addStateStore() can only be called once for a given store.  So for
> your example, if the there was a single TransformerSupplier that was passed
> into both transform() calls, "store1" would be added (under the hood) to
> the topology twice, which is no good.
> 
> Perhaps this suggests that one of my alternatives on the KIP might be
> desirable: either not having the suppliers return StoreBuilders (just store
> names), or not deprecating the old methods that take "String...
> stateStoreNames". I'll have to think about it a bit.
> 
> Paul
> 
> On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang  wrote:
> 
>> Hello Paul,
>>
>> Thanks for the great writeup (very detailed and crystal motivation
>> sections!).
>>
>> This is quite an interesting idea and I do like the API cleanness you
>> proposed. The original motivation of letting StreamsTopology to add state
>> stores though, is to allow different processors to share the state store.
>> For example:
>>
>> builder.addStore("store1");
>>
>> // a path of stream transformations that leads to KStream stream1.
>> stream1.transform(..., "store1");
>>
>> // another path that generates a KStream stream2.
>> stream2.transform(..., "store1");
>>
>> Behind the scene, Streams will make sure stream1 / stream2 transformations
>> will always be grouped together as a single group of tasks, each of which
>> will be executed by a single thread and hence there's no concurrency issues
>> on accessing the store from different operators within the same task. I'm
>> not sure how common this use case is, but I'd like to hear if you have any
>> thoughts maintaining this since the current proposal seems exclude this
>> possibility.
>>
>>
>> Guozhang
>>
>>
>> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen  wrote:
>>
>>> Here's KIP-401 for discussion, a minor Kafka Streams API change that I
>>> think could greatly increase the usability of the low-level processor
>> API.
>>> I have some code written but will wait to see if there is buy in before
>>> going all out and creating a pull request.  It seems like most of the
>> work
>>> would be in updating documentation and tests.
>>>
>>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
>>>
>>> Thanks!
>>> Paul
>>>
>>
>>
>> --
>> -- Guozhang
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2018-12-10 Thread Paul Whalen
Ah yes of course, this was an oversight, I completely ignored the multiple
processors sharing the same state store when writing up the KIP.  Which is
funny, because I've actually done this (different processors sharing state
stores) a fair amount myself, and I've settled on a pattern where I group
the Processors in an enclosing class, and that enclosing class handles as
much as possible.  Here's a gist showing the rough structure, just for
context: https://gist.github.com/pgwhalen/57a00dcc2269b7610e1aaeb1549b3b65
. Note how it adds the stores to the topology, as well as providing a
public method with the store names.

I don't think my proposal completely conflicts with the multiple processors
sharing state stores use case, since you can create a supplier that
provides the store name you want, somewhat independently of your actual
Processor logic.  The issue I do see though, is that
topology.addStateStore() can only be called once for a given store.  So for
your example, if the there was a single TransformerSupplier that was passed
into both transform() calls, "store1" would be added (under the hood) to
the topology twice, which is no good.

Perhaps this suggests that one of my alternatives on the KIP might be
desirable: either not having the suppliers return StoreBuilders (just store
names), or not deprecating the old methods that take "String...
stateStoreNames". I'll have to think about it a bit.

Paul

On Sun, Dec 9, 2018 at 11:57 PM Guozhang Wang  wrote:

> Hello Paul,
>
> Thanks for the great writeup (very detailed and crystal motivation
> sections!).
>
> This is quite an interesting idea and I do like the API cleanness you
> proposed. The original motivation of letting StreamsTopology to add state
> stores though, is to allow different processors to share the state store.
> For example:
>
> builder.addStore("store1");
>
> // a path of stream transformations that leads to KStream stream1.
> stream1.transform(..., "store1");
>
> // another path that generates a KStream stream2.
> stream2.transform(..., "store1");
>
> Behind the scene, Streams will make sure stream1 / stream2 transformations
> will always be grouped together as a single group of tasks, each of which
> will be executed by a single thread and hence there's no concurrency issues
> on accessing the store from different operators within the same task. I'm
> not sure how common this use case is, but I'd like to hear if you have any
> thoughts maintaining this since the current proposal seems exclude this
> possibility.
>
>
> Guozhang
>
>
> On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen  wrote:
>
> > Here's KIP-401 for discussion, a minor Kafka Streams API change that I
> > think could greatly increase the usability of the low-level processor
> API.
> > I have some code written but will wait to see if there is buy in before
> > going all out and creating a pull request.  It seems like most of the
> work
> > would be in updating documentation and tests.
> >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
> >
> > Thanks!
> > Paul
> >
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2018-12-09 Thread Guozhang Wang
Hello Paul,

Thanks for the great writeup (very detailed and crystal motivation
sections!).

This is quite an interesting idea and I do like the API cleanness you
proposed. The original motivation of letting StreamsTopology to add state
stores though, is to allow different processors to share the state store.
For example:

builder.addStore("store1");

// a path of stream transformations that leads to KStream stream1.
stream1.transform(..., "store1");

// another path that generates a KStream stream2.
stream2.transform(..., "store1");

Behind the scene, Streams will make sure stream1 / stream2 transformations
will always be grouped together as a single group of tasks, each of which
will be executed by a single thread and hence there's no concurrency issues
on accessing the store from different operators within the same task. I'm
not sure how common this use case is, but I'd like to hear if you have any
thoughts maintaining this since the current proposal seems exclude this
possibility.


Guozhang


On Sun, Dec 9, 2018 at 4:18 PM Paul Whalen  wrote:

> Here's KIP-401 for discussion, a minor Kafka Streams API change that I
> think could greatly increase the usability of the low-level processor API.
> I have some code written but will wait to see if there is buy in before
> going all out and creating a pull request.  It seems like most of the work
> would be in updating documentation and tests.
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756
>
> Thanks!
> Paul
>


-- 
-- Guozhang


[DISCUSS] KIP-401 TransformerSupplier/ProcessorSupplier enhancements

2018-12-09 Thread Paul Whalen
Here's KIP-401 for discussion, a minor Kafka Streams API change that I
think could greatly increase the usability of the low-level processor API.
I have some code written but will wait to see if there is buy in before
going all out and creating a pull request.  It seems like most of the work
would be in updating documentation and tests.

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97553756

Thanks!
Paul