Re: [DISCUSS] KIP-862: Implement self-join optimization

2022-08-31 Thread Guozhang Wang
Thanks Vicky,

I do not have any further comments about the KIP.


Guozhang

On Tue, Aug 30, 2022 at 8:21 AM Vasiliki Papavasileiou
 wrote:

> Hi Guozhang,
>
> That's an excellent idea, I will make the changes. I was also going back
> and forth with having a specific config for each optimization or not but I
> feel your approach has the best of both worlds.
>
> Thank you,
> Vicky
>
> On Sun, Aug 28, 2022 at 6:20 AM Guozhang Wang  wrote:
>
> > Hello Vicky,
> >
> > I made a quick pass on your WIP PR and now I understand and agree that
> > compatibility is indeed preserved since we get the optimized topology in
> a
> > second pass, and hence we already "used and burned" the original
> topologies
> > naming suffices in the first pass.
> >
> > Regarding the configuration patterns, I still have a bit concern about
> it:
> > primarily, if we follow this pattern to introduce a new config for each
> > optimization rule, in the future we would have a lot of configs --- one
> per
> > rule --- inside the StreamsConfig. I thought about this back and forth
> > again and still feel that this may not be what we want.. I think stead,
> we
> > can change the existing `TOPOLOGY_OPTIMIZATION_CONFIG` to accept a list
> of
> > strings, separated by comma --- this aligns with other similar configs as
> > well --- so that for different scenarios users can choose either fine
> > grained or coarse grained controls, e.g.:
> >
> > * I just want to enable all rules, or none: "all", "none".
> > * I know my app was created with Kafka version X, and I just want to only
> > apply all rules that are already there since version X: "versionX" --- I
> > just made it up for future use cases since we discussed about it in the
> > original KIP when we introduced "TOPOLOGY_OPTIMIZATION_CONFIG", we do not
> > need to include it in this KIP.
> > * I know my app is compatible with specific rules A/B/C, and I just want
> to
> > always enable those and not others: "ruleA,ruleB,ruleC".
> >
> > SO far we only have a few rules: a) reuse source topic as changelog topic
> > for KTable, b) merge duplicate repartition topics, c) self-join (this
> KIP),
> > so I suggest in this KIP, we just add make the
> > `TOPOLOGY_OPTIMIZATION_CONFIG` accepting a list of string, but 1) check
> > that some strings cannot coexist (e.g. `none` and all`), and 2) add a new
> > string value for self-join itself. In this way:
> >
> > * People who chose `none` before will not be impacted.
> > * People who chose `all` before will get this optimization by default,
> and
> > it's backward compatible so it's okay; they also get what they meant: I
> > just want "all" :)
> > * Advanced users who read about this KIP and just what it but not others:
> > they will change their config from `none` to `self-join`.
> >
> > WDYT?
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Fri, Aug 12, 2022 at 7:25 PM John Roesler 
> wrote:
> >
> > > Thanks for the KIP, Vicky!
> > >
> > > Re 1/2, I agree with what you both worked out.
> > >
> > > Re 3: It sounds like you were able to preserve backward compatibility,
> so
> > > I don’t think you need to add any new configs. I think you can just
> > switch
> > > it on if people specify “all”.
> > >
> > > Thanks!
> > > -John
> > >
> > >
> > > On Thu, Aug 11, 2022, at 11:27, Guozhang Wang wrote:
> > > > Thanks Vicky for your reply!
> > > >
> > > > Re 1/2): I think you have a great point here to adhere with the
> > existing
> > > > implementation, I'm convinced. In that case we do not need to
> consider
> > > > left/outer-joins, and hence do not need to worry about the extra
> store
> > in
> > > > the impl.
> > > >
> > > > Re 3): I'm curious how the compatibility is preserved since with
> > > > optimizations turned on, we would use fewer stores and hence the
> store
> > > name
> > > > suffixes would change. In your experiment did you specifically
> specify
> > > the
> > > > store names, e.g. via Materialized? I'd be glad if it turns out to
> > really
> > > > be conveniently backward compatible, and rest with my concerns :)
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Thu, Aug 11, 2022 at 4:44 AM Vasiliki Papavasileiou
> > > >  wrote:
> > > >
> > > >> Hi Guozhang,
> > > >>
> > > >> Thank you very much for your comments.
> > > >>
> > > >> Regarding 1: the extra state store is only needed in outer joins
> since
> > > >> that's the only case we have non-joining records that would need to
> > get
> > > >> emitted when the window closes, right? If we do decide to go with an
> > > >> outer-join implementation, I will make sure to have the extra state
> > > store
> > > >> as well. Thank you for pointing it out.
> > > >>
> > > >> Regarding 2: As the self-join is only a physical optimization over
> an
> > > inner
> > > >> join whose two arguments are the same entity, it should return the
> > same
> > > >> results as the inner join. We wouldn't want a user upgrading and
> > > enabling
> > > >> the optimization to suddenly see that their joins behave differently
> > and

Re: [DISCUSS] KIP-862: Implement self-join optimization

2022-08-30 Thread Vasiliki Papavasileiou
Hi Guozhang,

That's an excellent idea, I will make the changes. I was also going back
and forth with having a specific config for each optimization or not but I
feel your approach has the best of both worlds.

Thank you,
Vicky

On Sun, Aug 28, 2022 at 6:20 AM Guozhang Wang  wrote:

> Hello Vicky,
>
> I made a quick pass on your WIP PR and now I understand and agree that
> compatibility is indeed preserved since we get the optimized topology in a
> second pass, and hence we already "used and burned" the original topologies
> naming suffices in the first pass.
>
> Regarding the configuration patterns, I still have a bit concern about it:
> primarily, if we follow this pattern to introduce a new config for each
> optimization rule, in the future we would have a lot of configs --- one per
> rule --- inside the StreamsConfig. I thought about this back and forth
> again and still feel that this may not be what we want.. I think stead, we
> can change the existing `TOPOLOGY_OPTIMIZATION_CONFIG` to accept a list of
> strings, separated by comma --- this aligns with other similar configs as
> well --- so that for different scenarios users can choose either fine
> grained or coarse grained controls, e.g.:
>
> * I just want to enable all rules, or none: "all", "none".
> * I know my app was created with Kafka version X, and I just want to only
> apply all rules that are already there since version X: "versionX" --- I
> just made it up for future use cases since we discussed about it in the
> original KIP when we introduced "TOPOLOGY_OPTIMIZATION_CONFIG", we do not
> need to include it in this KIP.
> * I know my app is compatible with specific rules A/B/C, and I just want to
> always enable those and not others: "ruleA,ruleB,ruleC".
>
> SO far we only have a few rules: a) reuse source topic as changelog topic
> for KTable, b) merge duplicate repartition topics, c) self-join (this KIP),
> so I suggest in this KIP, we just add make the
> `TOPOLOGY_OPTIMIZATION_CONFIG` accepting a list of string, but 1) check
> that some strings cannot coexist (e.g. `none` and all`), and 2) add a new
> string value for self-join itself. In this way:
>
> * People who chose `none` before will not be impacted.
> * People who chose `all` before will get this optimization by default, and
> it's backward compatible so it's okay; they also get what they meant: I
> just want "all" :)
> * Advanced users who read about this KIP and just what it but not others:
> they will change their config from `none` to `self-join`.
>
> WDYT?
>
>
> Guozhang
>
>
>
>
> On Fri, Aug 12, 2022 at 7:25 PM John Roesler  wrote:
>
> > Thanks for the KIP, Vicky!
> >
> > Re 1/2, I agree with what you both worked out.
> >
> > Re 3: It sounds like you were able to preserve backward compatibility, so
> > I don’t think you need to add any new configs. I think you can just
> switch
> > it on if people specify “all”.
> >
> > Thanks!
> > -John
> >
> >
> > On Thu, Aug 11, 2022, at 11:27, Guozhang Wang wrote:
> > > Thanks Vicky for your reply!
> > >
> > > Re 1/2): I think you have a great point here to adhere with the
> existing
> > > implementation, I'm convinced. In that case we do not need to consider
> > > left/outer-joins, and hence do not need to worry about the extra store
> in
> > > the impl.
> > >
> > > Re 3): I'm curious how the compatibility is preserved since with
> > > optimizations turned on, we would use fewer stores and hence the store
> > name
> > > suffixes would change. In your experiment did you specifically specify
> > the
> > > store names, e.g. via Materialized? I'd be glad if it turns out to
> really
> > > be conveniently backward compatible, and rest with my concerns :)
> > >
> > >
> > > Guozhang
> > >
> > > On Thu, Aug 11, 2022 at 4:44 AM Vasiliki Papavasileiou
> > >  wrote:
> > >
> > >> Hi Guozhang,
> > >>
> > >> Thank you very much for your comments.
> > >>
> > >> Regarding 1: the extra state store is only needed in outer joins since
> > >> that's the only case we have non-joining records that would need to
> get
> > >> emitted when the window closes, right? If we do decide to go with an
> > >> outer-join implementation, I will make sure to have the extra state
> > store
> > >> as well. Thank you for pointing it out.
> > >>
> > >> Regarding 2: As the self-join is only a physical optimization over an
> > inner
> > >> join whose two arguments are the same entity, it should return the
> same
> > >> results as the inner join. We wouldn't want a user upgrading and
> > enabling
> > >> the optimization to suddenly see that their joins behave differently
> and
> > >> produce different results.
> > >> As an example, consider the records  and  where A is the
> key
> > and
> > >> the number is the value and both are strings. Assume these records are
> > >> piped into an input topic. And assume we have a self-join (not
> > optimized,
> > >> so inner join implementation) whose joiner concatenates the values.
> > >> The output of the join after processing the first re

Re: [DISCUSS] KIP-862: Implement self-join optimization

2022-08-27 Thread Guozhang Wang
Hello Vicky,

I made a quick pass on your WIP PR and now I understand and agree that
compatibility is indeed preserved since we get the optimized topology in a
second pass, and hence we already "used and burned" the original topologies
naming suffices in the first pass.

Regarding the configuration patterns, I still have a bit concern about it:
primarily, if we follow this pattern to introduce a new config for each
optimization rule, in the future we would have a lot of configs --- one per
rule --- inside the StreamsConfig. I thought about this back and forth
again and still feel that this may not be what we want.. I think stead, we
can change the existing `TOPOLOGY_OPTIMIZATION_CONFIG` to accept a list of
strings, separated by comma --- this aligns with other similar configs as
well --- so that for different scenarios users can choose either fine
grained or coarse grained controls, e.g.:

* I just want to enable all rules, or none: "all", "none".
* I know my app was created with Kafka version X, and I just want to only
apply all rules that are already there since version X: "versionX" --- I
just made it up for future use cases since we discussed about it in the
original KIP when we introduced "TOPOLOGY_OPTIMIZATION_CONFIG", we do not
need to include it in this KIP.
* I know my app is compatible with specific rules A/B/C, and I just want to
always enable those and not others: "ruleA,ruleB,ruleC".

SO far we only have a few rules: a) reuse source topic as changelog topic
for KTable, b) merge duplicate repartition topics, c) self-join (this KIP),
so I suggest in this KIP, we just add make the
`TOPOLOGY_OPTIMIZATION_CONFIG` accepting a list of string, but 1) check
that some strings cannot coexist (e.g. `none` and all`), and 2) add a new
string value for self-join itself. In this way:

* People who chose `none` before will not be impacted.
* People who chose `all` before will get this optimization by default, and
it's backward compatible so it's okay; they also get what they meant: I
just want "all" :)
* Advanced users who read about this KIP and just what it but not others:
they will change their config from `none` to `self-join`.

WDYT?


Guozhang




On Fri, Aug 12, 2022 at 7:25 PM John Roesler  wrote:

> Thanks for the KIP, Vicky!
>
> Re 1/2, I agree with what you both worked out.
>
> Re 3: It sounds like you were able to preserve backward compatibility, so
> I don’t think you need to add any new configs. I think you can just switch
> it on if people specify “all”.
>
> Thanks!
> -John
>
>
> On Thu, Aug 11, 2022, at 11:27, Guozhang Wang wrote:
> > Thanks Vicky for your reply!
> >
> > Re 1/2): I think you have a great point here to adhere with the existing
> > implementation, I'm convinced. In that case we do not need to consider
> > left/outer-joins, and hence do not need to worry about the extra store in
> > the impl.
> >
> > Re 3): I'm curious how the compatibility is preserved since with
> > optimizations turned on, we would use fewer stores and hence the store
> name
> > suffixes would change. In your experiment did you specifically specify
> the
> > store names, e.g. via Materialized? I'd be glad if it turns out to really
> > be conveniently backward compatible, and rest with my concerns :)
> >
> >
> > Guozhang
> >
> > On Thu, Aug 11, 2022 at 4:44 AM Vasiliki Papavasileiou
> >  wrote:
> >
> >> Hi Guozhang,
> >>
> >> Thank you very much for your comments.
> >>
> >> Regarding 1: the extra state store is only needed in outer joins since
> >> that's the only case we have non-joining records that would need to get
> >> emitted when the window closes, right? If we do decide to go with an
> >> outer-join implementation, I will make sure to have the extra state
> store
> >> as well. Thank you for pointing it out.
> >>
> >> Regarding 2: As the self-join is only a physical optimization over an
> inner
> >> join whose two arguments are the same entity, it should return the same
> >> results as the inner join. We wouldn't want a user upgrading and
> enabling
> >> the optimization to suddenly see that their joins behave differently and
> >> produce different results.
> >> As an example, consider the records  and  where A is the key
> and
> >> the number is the value and both are strings. Assume these records are
> >> piped into an input topic. And assume we have a self-join (not
> optimized,
> >> so inner join implementation) whose joiner concatenates the values.
> >> The output of the join after processing the first record is : .
> >> The output of the join after processing the second record is: ,
> >> , 
> >> So, for an inner join whose two arguments are the same stream, a record
> >> does join with itself. And as a user, I would expect the self-join
> >> optimization to produce the same results. What do you think?
> >>
> >> Regarding 3: I did a small experiment and I think the changes I did are
> >> backwards compatible. Basically, I created a topology without the
> >> optimization, had it process some data and 

Re: [DISCUSS] KIP-862: Implement self-join optimization

2022-08-12 Thread John Roesler
Thanks for the KIP, Vicky!

Re 1/2, I agree with what you both worked out. 

Re 3: It sounds like you were able to preserve backward compatibility, so I 
don’t think you need to add any new configs. I think you can just switch it on 
if people specify “all”. 

Thanks!
-John


On Thu, Aug 11, 2022, at 11:27, Guozhang Wang wrote:
> Thanks Vicky for your reply!
>
> Re 1/2): I think you have a great point here to adhere with the existing
> implementation, I'm convinced. In that case we do not need to consider
> left/outer-joins, and hence do not need to worry about the extra store in
> the impl.
>
> Re 3): I'm curious how the compatibility is preserved since with
> optimizations turned on, we would use fewer stores and hence the store name
> suffixes would change. In your experiment did you specifically specify the
> store names, e.g. via Materialized? I'd be glad if it turns out to really
> be conveniently backward compatible, and rest with my concerns :)
>
>
> Guozhang
>
> On Thu, Aug 11, 2022 at 4:44 AM Vasiliki Papavasileiou
>  wrote:
>
>> Hi Guozhang,
>>
>> Thank you very much for your comments.
>>
>> Regarding 1: the extra state store is only needed in outer joins since
>> that's the only case we have non-joining records that would need to get
>> emitted when the window closes, right? If we do decide to go with an
>> outer-join implementation, I will make sure to have the extra state store
>> as well. Thank you for pointing it out.
>>
>> Regarding 2: As the self-join is only a physical optimization over an inner
>> join whose two arguments are the same entity, it should return the same
>> results as the inner join. We wouldn't want a user upgrading and enabling
>> the optimization to suddenly see that their joins behave differently and
>> produce different results.
>> As an example, consider the records  and  where A is the key and
>> the number is the value and both are strings. Assume these records are
>> piped into an input topic. And assume we have a self-join (not optimized,
>> so inner join implementation) whose joiner concatenates the values.
>> The output of the join after processing the first record is : .
>> The output of the join after processing the second record is: ,
>> , 
>> So, for an inner join whose two arguments are the same stream, a record
>> does join with itself. And as a user, I would expect the self-join
>> optimization to produce the same results. What do you think?
>>
>> Regarding 3: I did a small experiment and I think the changes I did are
>> backwards compatible. Basically, I created a topology without the
>> optimization, had it process some data and killed it. Then I started it
>> again but with the optimization turned on, and the processing resumed fine
>> as in there was no exception and no extra state stores created and the join
>> results made sense. The optimization is keeping the same state store and
>> doesn't change the names or indices of nodes in the topology. I will
>> however need to add a case for self-joins in the upgrade system tests to
>> make sure that things don't break. Is this sufficient?
>> Regarding the config, one way to go would be to have one config per
>> optimization but I am worried that this will get unwieldy if in the future
>> we have a lot of them and also requires the user to know about the
>> optimizations to be able to benefit from them. Another alternative is to
>> assume that if the TOPOLOGY_OPTIMIZATION_CONFIG is on (`all`), then all
>> optimizations are applied. If the user doesn't want a specific
>> optimization, then they need to turn that one off. So, we will have a
>> config per optimization but they will be on by default.
>>
>> Best,
>> Vicky
>>
>> On Tue, Aug 9, 2022 at 7:03 PM Guozhang Wang  wrote:
>>
>> > Hello Vicky,
>> >
>> > Thanks for the KIP! I made a quick pass and here are some quick thoughts:
>> >
>> > 1. Store Implementation: this may be not directly related to the KIP
>> itself
>> > since its all internal, but the stream-stream join state store
>> > implementation has been changed in
>> > https://issues.apache.org/jira/browse/KAFKA-10847, in which we added a
>> > separate store to maintain all the records that have not found a match
>> yet,
>> > and would emit them when time passed for left/outer joins. In this
>> > optimization, I think we can still go with a single store but we need to
>> > make sure we do not regress on KAFKA-10847, i.e. for records not finding
>> a
>> > match, we should also emit them when time passed by, this would likely
>> rely
>> > on the ability to range-over the only store on its "expired" records. A
>> > good reference would be in the recent works to allow emitting final for
>> > windowed aggregations (cc @Hao Li  who can provide
>> > some more references).
>> >
>> > 2. Join Semantics and Outer-Joins: I think we need to clarify for any
>> > single stream record, would itself also be considered a "match" for
>> itself,
>> > OR should we consider only a different record but with the same key

Re: [DISCUSS] KIP-862: Implement self-join optimization

2022-08-11 Thread Guozhang Wang
Thanks Vicky for your reply!

Re 1/2): I think you have a great point here to adhere with the existing
implementation, I'm convinced. In that case we do not need to consider
left/outer-joins, and hence do not need to worry about the extra store in
the impl.

Re 3): I'm curious how the compatibility is preserved since with
optimizations turned on, we would use fewer stores and hence the store name
suffixes would change. In your experiment did you specifically specify the
store names, e.g. via Materialized? I'd be glad if it turns out to really
be conveniently backward compatible, and rest with my concerns :)


Guozhang

On Thu, Aug 11, 2022 at 4:44 AM Vasiliki Papavasileiou
 wrote:

> Hi Guozhang,
>
> Thank you very much for your comments.
>
> Regarding 1: the extra state store is only needed in outer joins since
> that's the only case we have non-joining records that would need to get
> emitted when the window closes, right? If we do decide to go with an
> outer-join implementation, I will make sure to have the extra state store
> as well. Thank you for pointing it out.
>
> Regarding 2: As the self-join is only a physical optimization over an inner
> join whose two arguments are the same entity, it should return the same
> results as the inner join. We wouldn't want a user upgrading and enabling
> the optimization to suddenly see that their joins behave differently and
> produce different results.
> As an example, consider the records  and  where A is the key and
> the number is the value and both are strings. Assume these records are
> piped into an input topic. And assume we have a self-join (not optimized,
> so inner join implementation) whose joiner concatenates the values.
> The output of the join after processing the first record is : .
> The output of the join after processing the second record is: ,
> , 
> So, for an inner join whose two arguments are the same stream, a record
> does join with itself. And as a user, I would expect the self-join
> optimization to produce the same results. What do you think?
>
> Regarding 3: I did a small experiment and I think the changes I did are
> backwards compatible. Basically, I created a topology without the
> optimization, had it process some data and killed it. Then I started it
> again but with the optimization turned on, and the processing resumed fine
> as in there was no exception and no extra state stores created and the join
> results made sense. The optimization is keeping the same state store and
> doesn't change the names or indices of nodes in the topology. I will
> however need to add a case for self-joins in the upgrade system tests to
> make sure that things don't break. Is this sufficient?
> Regarding the config, one way to go would be to have one config per
> optimization but I am worried that this will get unwieldy if in the future
> we have a lot of them and also requires the user to know about the
> optimizations to be able to benefit from them. Another alternative is to
> assume that if the TOPOLOGY_OPTIMIZATION_CONFIG is on (`all`), then all
> optimizations are applied. If the user doesn't want a specific
> optimization, then they need to turn that one off. So, we will have a
> config per optimization but they will be on by default.
>
> Best,
> Vicky
>
> On Tue, Aug 9, 2022 at 7:03 PM Guozhang Wang  wrote:
>
> > Hello Vicky,
> >
> > Thanks for the KIP! I made a quick pass and here are some quick thoughts:
> >
> > 1. Store Implementation: this may be not directly related to the KIP
> itself
> > since its all internal, but the stream-stream join state store
> > implementation has been changed in
> > https://issues.apache.org/jira/browse/KAFKA-10847, in which we added a
> > separate store to maintain all the records that have not found a match
> yet,
> > and would emit them when time passed for left/outer joins. In this
> > optimization, I think we can still go with a single store but we need to
> > make sure we do not regress on KAFKA-10847, i.e. for records not finding
> a
> > match, we should also emit them when time passed by, this would likely
> rely
> > on the ability to range-over the only store on its "expired" records. A
> > good reference would be in the recent works to allow emitting final for
> > windowed aggregations (cc @Hao Li  who can provide
> > some more references).
> >
> > 2. Join Semantics and Outer-Joins: I think we need to clarify for any
> > single stream record, would itself also be considered a "match" for
> itself,
> > OR should we consider only a different record but with the same key and
> > within the join window a "match" for itself. If it's the former, then I
> > agree that outer-joins (even left-joins, right?) would not make sense
> since
> > we would always find at least a match for any record; if it's the latter,
> > then outer/left joins still make sense and we would need to consider the
> > store implementation as stated in 1) above. Personally, I think the
> latter
> > is better --- I know it's a bit aw

Re: [DISCUSS] KIP-862: Implement self-join optimization

2022-08-11 Thread Vasiliki Papavasileiou
Hi Guozhang,

Thank you very much for your comments.

Regarding 1: the extra state store is only needed in outer joins since
that's the only case we have non-joining records that would need to get
emitted when the window closes, right? If we do decide to go with an
outer-join implementation, I will make sure to have the extra state store
as well. Thank you for pointing it out.

Regarding 2: As the self-join is only a physical optimization over an inner
join whose two arguments are the same entity, it should return the same
results as the inner join. We wouldn't want a user upgrading and enabling
the optimization to suddenly see that their joins behave differently and
produce different results.
As an example, consider the records  and  where A is the key and
the number is the value and both are strings. Assume these records are
piped into an input topic. And assume we have a self-join (not optimized,
so inner join implementation) whose joiner concatenates the values.
The output of the join after processing the first record is : .
The output of the join after processing the second record is: ,
, 
So, for an inner join whose two arguments are the same stream, a record
does join with itself. And as a user, I would expect the self-join
optimization to produce the same results. What do you think?

Regarding 3: I did a small experiment and I think the changes I did are
backwards compatible. Basically, I created a topology without the
optimization, had it process some data and killed it. Then I started it
again but with the optimization turned on, and the processing resumed fine
as in there was no exception and no extra state stores created and the join
results made sense. The optimization is keeping the same state store and
doesn't change the names or indices of nodes in the topology. I will
however need to add a case for self-joins in the upgrade system tests to
make sure that things don't break. Is this sufficient?
Regarding the config, one way to go would be to have one config per
optimization but I am worried that this will get unwieldy if in the future
we have a lot of them and also requires the user to know about the
optimizations to be able to benefit from them. Another alternative is to
assume that if the TOPOLOGY_OPTIMIZATION_CONFIG is on (`all`), then all
optimizations are applied. If the user doesn't want a specific
optimization, then they need to turn that one off. So, we will have a
config per optimization but they will be on by default.

Best,
Vicky

On Tue, Aug 9, 2022 at 7:03 PM Guozhang Wang  wrote:

> Hello Vicky,
>
> Thanks for the KIP! I made a quick pass and here are some quick thoughts:
>
> 1. Store Implementation: this may be not directly related to the KIP itself
> since its all internal, but the stream-stream join state store
> implementation has been changed in
> https://issues.apache.org/jira/browse/KAFKA-10847, in which we added a
> separate store to maintain all the records that have not found a match yet,
> and would emit them when time passed for left/outer joins. In this
> optimization, I think we can still go with a single store but we need to
> make sure we do not regress on KAFKA-10847, i.e. for records not finding a
> match, we should also emit them when time passed by, this would likely rely
> on the ability to range-over the only store on its "expired" records. A
> good reference would be in the recent works to allow emitting final for
> windowed aggregations (cc @Hao Li  who can provide
> some more references).
>
> 2. Join Semantics and Outer-Joins: I think we need to clarify for any
> single stream record, would itself also be considered a "match" for itself,
> OR should we consider only a different record but with the same key and
> within the join window a "match" for itself. If it's the former, then I
> agree that outer-joins (even left-joins, right?) would not make sense since
> we would always find at least a match for any record; if it's the latter,
> then outer/left joins still make sense and we would need to consider the
> store implementation as stated in 1) above. Personally, I think the latter
> is better --- I know it's a bit away from the RDBMS self-join semantics but
> for RDBMS self-joins are usually not on PKs, but on FKs so I think its
> semantics is less relevant to what we are considering here for windowed
> stream-stream joins which are still on PKs.
>
> 3. Compatibility: first of all, I think we should introduce new values for
> the TOPOLOGY_OPTIMIZATION_CONFIG for this specific optimization in addition
> to `all` and `none`, this is also what we discussed before to keep
> compatibility. But for applications that are already running, we'd also
> need to make sure that after a rolling bounce with this config value
> changed, we would not break the app. That involves: a) the store names (and
> hence the changelog names) should not change -- when we use suffixes, we
> should make sure they do not change by burning some suffixes as well, b)
> the processor 

Re: [DISCUSS] KIP-862: Implement self-join optimization

2022-08-09 Thread Guozhang Wang
Hello Vicky,

Thanks for the KIP! I made a quick pass and here are some quick thoughts:

1. Store Implementation: this may be not directly related to the KIP itself
since its all internal, but the stream-stream join state store
implementation has been changed in
https://issues.apache.org/jira/browse/KAFKA-10847, in which we added a
separate store to maintain all the records that have not found a match yet,
and would emit them when time passed for left/outer joins. In this
optimization, I think we can still go with a single store but we need to
make sure we do not regress on KAFKA-10847, i.e. for records not finding a
match, we should also emit them when time passed by, this would likely rely
on the ability to range-over the only store on its "expired" records. A
good reference would be in the recent works to allow emitting final for
windowed aggregations (cc @Hao Li  who can provide
some more references).

2. Join Semantics and Outer-Joins: I think we need to clarify for any
single stream record, would itself also be considered a "match" for itself,
OR should we consider only a different record but with the same key and
within the join window a "match" for itself. If it's the former, then I
agree that outer-joins (even left-joins, right?) would not make sense since
we would always find at least a match for any record; if it's the latter,
then outer/left joins still make sense and we would need to consider the
store implementation as stated in 1) above. Personally, I think the latter
is better --- I know it's a bit away from the RDBMS self-join semantics but
for RDBMS self-joins are usually not on PKs, but on FKs so I think its
semantics is less relevant to what we are considering here for windowed
stream-stream joins which are still on PKs.

3. Compatibility: first of all, I think we should introduce new values for
the TOPOLOGY_OPTIMIZATION_CONFIG for this specific optimization in addition
to `all` and `none`, this is also what we discussed before to keep
compatibility. But for applications that are already running, we'd also
need to make sure that after a rolling bounce with this config value
changed, we would not break the app. That involves: a) the store names (and
hence the changelog names) should not change -- when we use suffixes, we
should make sure they do not change by burning some suffixes as well, b)
the processor names, similar to store names, c) store formats, if we ever
change the store formats, we need to consider a live upgrade path as well.

Please let me know your thoughts.

Guozhang


On Tue, Aug 2, 2022 at 11:31 AM Vasiliki Papavasileiou
 wrote:

> Hello everyone,
>
> I would like to start the discussion for KIP-862: Implement self-join
> optimization
>
> The KIP can be found here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join
>
> Any suggestions are more than welcome.
>
> Many thanks,
> Vicky
>


-- 
-- Guozhang


[DISCUSS] KIP-862: Implement self-join optimization

2022-08-02 Thread Vasiliki Papavasileiou
Hello everyone,

I would like to start the discussion for KIP-862: Implement self-join
optimization

The KIP can be found here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-862%3A+Self-join

Any suggestions are more than welcome.

Many thanks,
Vicky