Re: Kafka-Streams-Scala for Scala 3

2024-02-09 Thread Josep Prat
Hi Matthias,

Yes, it's just a matter of adding the [DISCUSS] prefix in the subject.
By the way, I didn't say this won't need a KIP, just that I won't be
pushing for it, but other maintainers might think it's needed.

For the discuss thread, you should write down what changes in the build and
what steps would be needed to create the artifacts.

Best,

Josep Prat
Open Source Engineering Director, aivenjosep.p...@aiven.io   |
+491715557497 | aiven.io
Aiven Deutschland GmbH
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B

On Fri, Feb 9, 2024, 19:55 Matthias Berndt 
wrote:

> Hey Josep,
>
> I'm glad you agree that a KIP is not needed here, and I agree with you that
> how to publish these artifacts should be discussed with the Kafka team. In
> fact, this is what I created this thread for 😁 This is my first time
> contributing to Kafka, so I'm going to have to ask what a DISCUSS thread
> is. Is it just a mailing list thread with a subject that starts with
> [DISCUSS], or is there more behind it?
>
> Best regards,
> Matthias
>
> Am Fr., 9. Feb. 2024 um 18:31 Uhr schrieb Josep Prat
> :
>
> > Hi Matthias,
> > It's not adding a new functionality but it's changing the way to generate
> > artifacts. In the end we are talking about generating a new binary.
> >
> > I could live with not having a KIP, but a DISCUSS thread I think it's
> > necessary. This signals the community members and maintainers that their
> > input is needed.
> >
> > I could help you with writing the KIP if you want.
> >
> > Best,
> >
> > Josep Prat
> > Open Source Engineering Director, aivenjosep.p...@aiven.io   |
> > +491715557497 | aiven.io
> > Aiven Deutschland GmbH
> > Alexanderufer 3-7, 10117 Berlin
> > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> > Amtsgericht Charlottenburg, HRB 209739 B
> >
> > On Fri, Feb 9, 2024, 18:02 Matthias Berndt 
> > wrote:
> >
> > > Hi Matthias, Hi Josep,
> > >
> > > I'm afraid I can't do the KIP thing as the signup process for Apache
> > > Confluence requires sending me a password reset link via E-Mail and
> said
> > > E-Mail doesn't seem to reach me for some reason. I've contacted the
> > Apache
> > > infrastructure team but haven't yet heard back from them.
> > > That said, I'd like to push back on the notion that a KIP is really
> > > necessary for this change. It's certainly not a “major new feature” as
> it
> > > adds zero extra functionality, and it doesn't affect binary
> compatibility
> > > either as all the currently supported Scala versions are still
> supported.
> > > This looks like a routine upgrade to me. Please, let's try to keep the
> > > administrative overhead to the required minimum, shall we?
> > > Thanks btw for merging Github PR #15239 (removal of
> > scala-collection-compat
> > > dependency from the 2.13 artifact). That will already improve life for
> > > Scala 3 users.
> > >
> > > All the best,
> > > Matthias
> > >
> > >
> > > Am Do., 8. Feb. 2024 um 18:02 Uhr schrieb Matthias J. Sax <
> > > mj...@apache.org
> > > >:
> > >
> > > > Josep,
> > > >
> > > > thanks for helping with this. I was also thinking if we might need a
> > KIP
> > > > for this change. Since you had the same though, I would say, yes,
> let's
> > > > do a KIP.
> > > >
> > > > @Matthias: can you prepare a KIP? You can read up on the details on
> the
> > > > wiki page:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> > > >
> > > > If you have any questions about the process, please let us know.
> > > >
> > > > Thanks for pushing this forward!
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 2/8/24 8:08 AM, Matthias Berndt wrote:
> > > > > Hey Josep et al,
> > > > >
> > > > > I've created a ticket regarding this.
> > > > > https://issues.apache.org/jira/browse/KAFKA-16237
> > > > >
> > > > > All the best,
> > > > > Matthias
> > > > >
> > > > > Am Do., 8. Feb. 2024 um 11:42 Uhr schrieb Josep Prat
> > > > > :
> > > > >>
> > > > >> Go ahead and ask for a JIRA and Wiki account (Confluence). Let us
> > know
> > > > when
> > > > >> your accounts are created and we'll properly set them up so you
> can
> > > > create
> > > > >> and assign tickets to you.
> > > > >>
> > > > >> Best,
> > > > >>
> > > > >> On Thu, Feb 8, 2024 at 11:32 AM Matthias Berndt <
> > > > matthias.ber...@ttmzero.com>
> > > > >> wrote:
> > > > >>
> > > > >>> Thanks Josep, I've applied for a JIRA account and addressed your
> > > > >>> review comments.
> > > > >>>
> > > > >>> Am Do., 8. Feb. 2024 um 09:19 Uhr schrieb Josep Prat
> > > > >>> :
> > > > 
> > > >  Hi Matthias,
> > > > 
> > > >  I think for this particular case it would be worth creating a
> JIRA
> > > > ticket
> > > >  for this as it's a new "feature".
> > > >  Regarding the change itself, I think we need to clarify how the
> > > > release
> > > >  process would work. Right now, the script `gradlewAll` is used
> > > (which
> > > > >

Re: Kafka-Streams-Scala for Scala 3

2024-02-09 Thread Matthias Berndt
Hey Josep,

I'm glad you agree that a KIP is not needed here, and I agree with you that
how to publish these artifacts should be discussed with the Kafka team. In
fact, this is what I created this thread for 😁 This is my first time
contributing to Kafka, so I'm going to have to ask what a DISCUSS thread
is. Is it just a mailing list thread with a subject that starts with
[DISCUSS], or is there more behind it?

Best regards,
Matthias

Am Fr., 9. Feb. 2024 um 18:31 Uhr schrieb Josep Prat
:

> Hi Matthias,
> It's not adding a new functionality but it's changing the way to generate
> artifacts. In the end we are talking about generating a new binary.
>
> I could live with not having a KIP, but a DISCUSS thread I think it's
> necessary. This signals the community members and maintainers that their
> input is needed.
>
> I could help you with writing the KIP if you want.
>
> Best,
>
> Josep Prat
> Open Source Engineering Director, aivenjosep.p...@aiven.io   |
> +491715557497 | aiven.io
> Aiven Deutschland GmbH
> Alexanderufer 3-7, 10117 Berlin
> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> Amtsgericht Charlottenburg, HRB 209739 B
>
> On Fri, Feb 9, 2024, 18:02 Matthias Berndt 
> wrote:
>
> > Hi Matthias, Hi Josep,
> >
> > I'm afraid I can't do the KIP thing as the signup process for Apache
> > Confluence requires sending me a password reset link via E-Mail and said
> > E-Mail doesn't seem to reach me for some reason. I've contacted the
> Apache
> > infrastructure team but haven't yet heard back from them.
> > That said, I'd like to push back on the notion that a KIP is really
> > necessary for this change. It's certainly not a “major new feature” as it
> > adds zero extra functionality, and it doesn't affect binary compatibility
> > either as all the currently supported Scala versions are still supported.
> > This looks like a routine upgrade to me. Please, let's try to keep the
> > administrative overhead to the required minimum, shall we?
> > Thanks btw for merging Github PR #15239 (removal of
> scala-collection-compat
> > dependency from the 2.13 artifact). That will already improve life for
> > Scala 3 users.
> >
> > All the best,
> > Matthias
> >
> >
> > Am Do., 8. Feb. 2024 um 18:02 Uhr schrieb Matthias J. Sax <
> > mj...@apache.org
> > >:
> >
> > > Josep,
> > >
> > > thanks for helping with this. I was also thinking if we might need a
> KIP
> > > for this change. Since you had the same though, I would say, yes, let's
> > > do a KIP.
> > >
> > > @Matthias: can you prepare a KIP? You can read up on the details on the
> > > wiki page:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> > >
> > > If you have any questions about the process, please let us know.
> > >
> > > Thanks for pushing this forward!
> > >
> > >
> > > -Matthias
> > >
> > > On 2/8/24 8:08 AM, Matthias Berndt wrote:
> > > > Hey Josep et al,
> > > >
> > > > I've created a ticket regarding this.
> > > > https://issues.apache.org/jira/browse/KAFKA-16237
> > > >
> > > > All the best,
> > > > Matthias
> > > >
> > > > Am Do., 8. Feb. 2024 um 11:42 Uhr schrieb Josep Prat
> > > > :
> > > >>
> > > >> Go ahead and ask for a JIRA and Wiki account (Confluence). Let us
> know
> > > when
> > > >> your accounts are created and we'll properly set them up so you can
> > > create
> > > >> and assign tickets to you.
> > > >>
> > > >> Best,
> > > >>
> > > >> On Thu, Feb 8, 2024 at 11:32 AM Matthias Berndt <
> > > matthias.ber...@ttmzero.com>
> > > >> wrote:
> > > >>
> > > >>> Thanks Josep, I've applied for a JIRA account and addressed your
> > > >>> review comments.
> > > >>>
> > > >>> Am Do., 8. Feb. 2024 um 09:19 Uhr schrieb Josep Prat
> > > >>> :
> > > 
> > >  Hi Matthias,
> > > 
> > >  I think for this particular case it would be worth creating a JIRA
> > > ticket
> > >  for this as it's a new "feature".
> > >  Regarding the change itself, I think we need to clarify how the
> > > release
> > >  process would work. Right now, the script `gradlewAll` is used
> > (which
> > >  basically runs the build with Scala version 2.12 and 2.13). If I
> > > >>> understand
> > >  your changes correctly, we would need to run the build 3 times:
> > >  - 1 with property scalaVersion 2.12
> > >  - 1 with scalaVersion 2.13 and streamsScalaVersion 2.13
> > >  - 1 with scalaVersion 2.13 and streamsScalaVersion 3.1
> > > 
> > >  I think we should document this and discuss when to have this
> > feature.
> > > >>> If I
> > >  remember correctly from when I tried to update Kafka to Scala 3,
> the
> > > idea
> > >  was to push this to a Kafka 4.0 version because we didn't want to
> > > >>> maintain
> > >  more than 2 Scala versions at the same time. I would encourage if
> > not
> > >  having a KIP, at least open up a [DISCUSS] thread to clarify some
> of
> > > >>> these
> > >  points.
> > > 
> > >  I'll add some feedback on the PR itself regarding

Re: Kafka-Streams-Scala for Scala 3

2024-02-09 Thread Josep Prat
Hi Matthias,
It's not adding a new functionality but it's changing the way to generate
artifacts. In the end we are talking about generating a new binary.

I could live with not having a KIP, but a DISCUSS thread I think it's
necessary. This signals the community members and maintainers that their
input is needed.

I could help you with writing the KIP if you want.

Best,

Josep Prat
Open Source Engineering Director, aivenjosep.p...@aiven.io   |
+491715557497 | aiven.io
Aiven Deutschland GmbH
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B

On Fri, Feb 9, 2024, 18:02 Matthias Berndt 
wrote:

> Hi Matthias, Hi Josep,
>
> I'm afraid I can't do the KIP thing as the signup process for Apache
> Confluence requires sending me a password reset link via E-Mail and said
> E-Mail doesn't seem to reach me for some reason. I've contacted the Apache
> infrastructure team but haven't yet heard back from them.
> That said, I'd like to push back on the notion that a KIP is really
> necessary for this change. It's certainly not a “major new feature” as it
> adds zero extra functionality, and it doesn't affect binary compatibility
> either as all the currently supported Scala versions are still supported.
> This looks like a routine upgrade to me. Please, let's try to keep the
> administrative overhead to the required minimum, shall we?
> Thanks btw for merging Github PR #15239 (removal of scala-collection-compat
> dependency from the 2.13 artifact). That will already improve life for
> Scala 3 users.
>
> All the best,
> Matthias
>
>
> Am Do., 8. Feb. 2024 um 18:02 Uhr schrieb Matthias J. Sax <
> mj...@apache.org
> >:
>
> > Josep,
> >
> > thanks for helping with this. I was also thinking if we might need a KIP
> > for this change. Since you had the same though, I would say, yes, let's
> > do a KIP.
> >
> > @Matthias: can you prepare a KIP? You can read up on the details on the
> > wiki page:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> >
> > If you have any questions about the process, please let us know.
> >
> > Thanks for pushing this forward!
> >
> >
> > -Matthias
> >
> > On 2/8/24 8:08 AM, Matthias Berndt wrote:
> > > Hey Josep et al,
> > >
> > > I've created a ticket regarding this.
> > > https://issues.apache.org/jira/browse/KAFKA-16237
> > >
> > > All the best,
> > > Matthias
> > >
> > > Am Do., 8. Feb. 2024 um 11:42 Uhr schrieb Josep Prat
> > > :
> > >>
> > >> Go ahead and ask for a JIRA and Wiki account (Confluence). Let us know
> > when
> > >> your accounts are created and we'll properly set them up so you can
> > create
> > >> and assign tickets to you.
> > >>
> > >> Best,
> > >>
> > >> On Thu, Feb 8, 2024 at 11:32 AM Matthias Berndt <
> > matthias.ber...@ttmzero.com>
> > >> wrote:
> > >>
> > >>> Thanks Josep, I've applied for a JIRA account and addressed your
> > >>> review comments.
> > >>>
> > >>> Am Do., 8. Feb. 2024 um 09:19 Uhr schrieb Josep Prat
> > >>> :
> > 
> >  Hi Matthias,
> > 
> >  I think for this particular case it would be worth creating a JIRA
> > ticket
> >  for this as it's a new "feature".
> >  Regarding the change itself, I think we need to clarify how the
> > release
> >  process would work. Right now, the script `gradlewAll` is used
> (which
> >  basically runs the build with Scala version 2.12 and 2.13). If I
> > >>> understand
> >  your changes correctly, we would need to run the build 3 times:
> >  - 1 with property scalaVersion 2.12
> >  - 1 with scalaVersion 2.13 and streamsScalaVersion 2.13
> >  - 1 with scalaVersion 2.13 and streamsScalaVersion 3.1
> > 
> >  I think we should document this and discuss when to have this
> feature.
> > >>> If I
> >  remember correctly from when I tried to update Kafka to Scala 3, the
> > idea
> >  was to push this to a Kafka 4.0 version because we didn't want to
> > >>> maintain
> >  more than 2 Scala versions at the same time. I would encourage if
> not
> >  having a KIP, at least open up a [DISCUSS] thread to clarify some of
> > >>> these
> >  points.
> > 
> >  I'll add some feedback on the PR itself regarding the changes.
> > 
> >  Best,
> > 
> >  On Thu, Feb 8, 2024 at 1:57 AM Matthias Berndt <
> > >>> matthias.ber...@ttmzero.com>
> >  wrote:
> > 
> > > Hi Matthias J., Hi Lucas, Hi Josep,
> > >
> > > Thank you for your encouraging responses regarding a Scala 3 port
> of
> > > Kafka-Streams-Scala, and apologies for the late response from my
> > side.
> > > I have now created a PR to port Kafka-Streams-Scala to Scala 3
> (while
> > > retaining support for 2.13 and 2.12). Almost no changes to the code
> > > were required and the tests also pass. Please take a look and let
> me
> > > know what you think :-)
> > > https://github.com/apache/kafka/pull/15338
> > >
> > > All the best
> > 

Re: Kafka-Streams-Scala for Scala 3

2024-02-09 Thread Matthias Berndt
Hi Matthias, Hi Josep,

I'm afraid I can't do the KIP thing as the signup process for Apache
Confluence requires sending me a password reset link via E-Mail and said
E-Mail doesn't seem to reach me for some reason. I've contacted the Apache
infrastructure team but haven't yet heard back from them.
That said, I'd like to push back on the notion that a KIP is really
necessary for this change. It's certainly not a “major new feature” as it
adds zero extra functionality, and it doesn't affect binary compatibility
either as all the currently supported Scala versions are still supported.
This looks like a routine upgrade to me. Please, let's try to keep the
administrative overhead to the required minimum, shall we?
Thanks btw for merging Github PR #15239 (removal of scala-collection-compat
dependency from the 2.13 artifact). That will already improve life for
Scala 3 users.

All the best,
Matthias


Am Do., 8. Feb. 2024 um 18:02 Uhr schrieb Matthias J. Sax :

> Josep,
>
> thanks for helping with this. I was also thinking if we might need a KIP
> for this change. Since you had the same though, I would say, yes, let's
> do a KIP.
>
> @Matthias: can you prepare a KIP? You can read up on the details on the
> wiki page:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
>
> If you have any questions about the process, please let us know.
>
> Thanks for pushing this forward!
>
>
> -Matthias
>
> On 2/8/24 8:08 AM, Matthias Berndt wrote:
> > Hey Josep et al,
> >
> > I've created a ticket regarding this.
> > https://issues.apache.org/jira/browse/KAFKA-16237
> >
> > All the best,
> > Matthias
> >
> > Am Do., 8. Feb. 2024 um 11:42 Uhr schrieb Josep Prat
> > :
> >>
> >> Go ahead and ask for a JIRA and Wiki account (Confluence). Let us know
> when
> >> your accounts are created and we'll properly set them up so you can
> create
> >> and assign tickets to you.
> >>
> >> Best,
> >>
> >> On Thu, Feb 8, 2024 at 11:32 AM Matthias Berndt <
> matthias.ber...@ttmzero.com>
> >> wrote:
> >>
> >>> Thanks Josep, I've applied for a JIRA account and addressed your
> >>> review comments.
> >>>
> >>> Am Do., 8. Feb. 2024 um 09:19 Uhr schrieb Josep Prat
> >>> :
> 
>  Hi Matthias,
> 
>  I think for this particular case it would be worth creating a JIRA
> ticket
>  for this as it's a new "feature".
>  Regarding the change itself, I think we need to clarify how the
> release
>  process would work. Right now, the script `gradlewAll` is used (which
>  basically runs the build with Scala version 2.12 and 2.13). If I
> >>> understand
>  your changes correctly, we would need to run the build 3 times:
>  - 1 with property scalaVersion 2.12
>  - 1 with scalaVersion 2.13 and streamsScalaVersion 2.13
>  - 1 with scalaVersion 2.13 and streamsScalaVersion 3.1
> 
>  I think we should document this and discuss when to have this feature.
> >>> If I
>  remember correctly from when I tried to update Kafka to Scala 3, the
> idea
>  was to push this to a Kafka 4.0 version because we didn't want to
> >>> maintain
>  more than 2 Scala versions at the same time. I would encourage if not
>  having a KIP, at least open up a [DISCUSS] thread to clarify some of
> >>> these
>  points.
> 
>  I'll add some feedback on the PR itself regarding the changes.
> 
>  Best,
> 
>  On Thu, Feb 8, 2024 at 1:57 AM Matthias Berndt <
> >>> matthias.ber...@ttmzero.com>
>  wrote:
> 
> > Hi Matthias J., Hi Lucas, Hi Josep,
> >
> > Thank you for your encouraging responses regarding a Scala 3 port of
> > Kafka-Streams-Scala, and apologies for the late response from my
> side.
> > I have now created a PR to port Kafka-Streams-Scala to Scala 3 (while
> > retaining support for 2.13 and 2.12). Almost no changes to the code
> > were required and the tests also pass. Please take a look and let me
> > know what you think :-)
> > https://github.com/apache/kafka/pull/15338
> >
> > All the best
> > Matthias
> >
> > Am Do., 1. Feb. 2024 um 16:35 Uhr schrieb Josep Prat
> > :
> >>
> >> Hi,
> >>
> >> For reference, prior work on this:
> >> https://github.com/apache/kafka/pull/11350
> >> https://github.com/apache/kafka/pull/11432
> >>
> >> Best,
> >>
> >> On Thu, Feb 1, 2024, 15:55 Lucas Brutschy  > .invalid>
> >> wrote:
> >>
> >>> Hi Matthiases,
> >>>
> >>> I know Scala 2 fairly well, so I'd be happy to review changes that
> >>> add
> >>> Scala 3 support. However, as Matthias S. said, it has to be driven
> >>> by
> >>> people who use Scala day-to-day, since I believe most Kafka Streams
> >>> committers are working with Java.
> >>>
> >>> Rewriting the tests to not use EmbeddedKafkaCluster seems like a
> >>> large
> >>> undertaking, so option 1 is the first thing we should explore.
> >>>
> >>> I don't have any experi

Re: Kafka-Streams-Scala for Scala 3

2024-02-08 Thread Matthias J. Sax

Josep,

thanks for helping with this. I was also thinking if we might need a KIP 
for this change. Since you had the same though, I would say, yes, let's 
do a KIP.


@Matthias: can you prepare a KIP? You can read up on the details on the 
wiki page: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals


If you have any questions about the process, please let us know.

Thanks for pushing this forward!


-Matthias

On 2/8/24 8:08 AM, Matthias Berndt wrote:

Hey Josep et al,

I've created a ticket regarding this.
https://issues.apache.org/jira/browse/KAFKA-16237

All the best,
Matthias

Am Do., 8. Feb. 2024 um 11:42 Uhr schrieb Josep Prat
:


Go ahead and ask for a JIRA and Wiki account (Confluence). Let us know when
your accounts are created and we'll properly set them up so you can create
and assign tickets to you.

Best,

On Thu, Feb 8, 2024 at 11:32 AM Matthias Berndt 
wrote:


Thanks Josep, I've applied for a JIRA account and addressed your
review comments.

Am Do., 8. Feb. 2024 um 09:19 Uhr schrieb Josep Prat
:


Hi Matthias,

I think for this particular case it would be worth creating a JIRA ticket
for this as it's a new "feature".
Regarding the change itself, I think we need to clarify how the release
process would work. Right now, the script `gradlewAll` is used (which
basically runs the build with Scala version 2.12 and 2.13). If I

understand

your changes correctly, we would need to run the build 3 times:
- 1 with property scalaVersion 2.12
- 1 with scalaVersion 2.13 and streamsScalaVersion 2.13
- 1 with scalaVersion 2.13 and streamsScalaVersion 3.1

I think we should document this and discuss when to have this feature.

If I

remember correctly from when I tried to update Kafka to Scala 3, the idea
was to push this to a Kafka 4.0 version because we didn't want to

maintain

more than 2 Scala versions at the same time. I would encourage if not
having a KIP, at least open up a [DISCUSS] thread to clarify some of

these

points.

I'll add some feedback on the PR itself regarding the changes.

Best,

On Thu, Feb 8, 2024 at 1:57 AM Matthias Berndt <

matthias.ber...@ttmzero.com>

wrote:


Hi Matthias J., Hi Lucas, Hi Josep,

Thank you for your encouraging responses regarding a Scala 3 port of
Kafka-Streams-Scala, and apologies for the late response from my side.
I have now created a PR to port Kafka-Streams-Scala to Scala 3 (while
retaining support for 2.13 and 2.12). Almost no changes to the code
were required and the tests also pass. Please take a look and let me
know what you think :-)
https://github.com/apache/kafka/pull/15338

All the best
Matthias

Am Do., 1. Feb. 2024 um 16:35 Uhr schrieb Josep Prat
:


Hi,

For reference, prior work on this:
https://github.com/apache/kafka/pull/11350
https://github.com/apache/kafka/pull/11432

Best,

On Thu, Feb 1, 2024, 15:55 Lucas Brutschy 
.invalid>

wrote:


Hi Matthiases,

I know Scala 2 fairly well, so I'd be happy to review changes that

add

Scala 3 support. However, as Matthias S. said, it has to be driven

by

people who use Scala day-to-day, since I believe most Kafka Streams
committers are working with Java.

Rewriting the tests to not use EmbeddedKafkaCluster seems like a

large

undertaking, so option 1 is the first thing we should explore.

I don't have any experience with Scala 3 migration topics, but on

the

Scala website it says

The first piece of good news is that the Scala 3 compiler is

able to

read the Scala 2.13 Pickle format and thus it can type check code

that

depends on modules or libraries compiled with Scala 2.13.

One notable example is the Scala 2.13 library. We have indeed

decided

that the Scala 2.13 library is the official standard library for

Scala

3.

So wouldn't that mean that we are safe in terms of standard library
upgrades if we use core_2.13 in the tests?

Cheers,
Lucas


On Wed, Jan 31, 2024 at 9:20 PM Matthias J. Sax 

wrote:


Thanks for raising this. The `kafka-streams-scala` module seems

to

be an

important feature for Kafka Streams and I am generally in favor

of

your

proposal to add Scala 3 support. However, I am personally no

Scala

person and it sounds like quite some overhead.

If you are willing to drive and own this initiative happy to

support

you

to the extend I can.

About the concrete proposal: my understanding is that :core will

move

off Scala long-term (not 100% sure what the timeline is, but new

modules

are written in Java only). Thus, down the road the compatibility

issue

would go away naturally, but it's unclear when.

Thus, if we can test kafak-stream-scala_3 with core_2.13 it

seems we

could add support for Scala 3 now, taking a risk that it might

break

in

the future assume that the migration off Scala from core is not

fast

enough.


For proposal (2), I don't think that it would be easily possible

for

unit/integration tests. We could fall back to system tests

though,

but

they would be much more heavy weight of course.

Might be good to hea

Re: Kafka-Streams-Scala for Scala 3

2024-02-08 Thread Matthias Berndt
Hey Josep et al,

I've created a ticket regarding this.
https://issues.apache.org/jira/browse/KAFKA-16237

All the best,
Matthias

Am Do., 8. Feb. 2024 um 11:42 Uhr schrieb Josep Prat
:
>
> Go ahead and ask for a JIRA and Wiki account (Confluence). Let us know when
> your accounts are created and we'll properly set them up so you can create
> and assign tickets to you.
>
> Best,
>
> On Thu, Feb 8, 2024 at 11:32 AM Matthias Berndt 
> wrote:
>
> > Thanks Josep, I've applied for a JIRA account and addressed your
> > review comments.
> >
> > Am Do., 8. Feb. 2024 um 09:19 Uhr schrieb Josep Prat
> > :
> > >
> > > Hi Matthias,
> > >
> > > I think for this particular case it would be worth creating a JIRA ticket
> > > for this as it's a new "feature".
> > > Regarding the change itself, I think we need to clarify how the release
> > > process would work. Right now, the script `gradlewAll` is used (which
> > > basically runs the build with Scala version 2.12 and 2.13). If I
> > understand
> > > your changes correctly, we would need to run the build 3 times:
> > > - 1 with property scalaVersion 2.12
> > > - 1 with scalaVersion 2.13 and streamsScalaVersion 2.13
> > > - 1 with scalaVersion 2.13 and streamsScalaVersion 3.1
> > >
> > > I think we should document this and discuss when to have this feature.
> > If I
> > > remember correctly from when I tried to update Kafka to Scala 3, the idea
> > > was to push this to a Kafka 4.0 version because we didn't want to
> > maintain
> > > more than 2 Scala versions at the same time. I would encourage if not
> > > having a KIP, at least open up a [DISCUSS] thread to clarify some of
> > these
> > > points.
> > >
> > > I'll add some feedback on the PR itself regarding the changes.
> > >
> > > Best,
> > >
> > > On Thu, Feb 8, 2024 at 1:57 AM Matthias Berndt <
> > matthias.ber...@ttmzero.com>
> > > wrote:
> > >
> > > > Hi Matthias J., Hi Lucas, Hi Josep,
> > > >
> > > > Thank you for your encouraging responses regarding a Scala 3 port of
> > > > Kafka-Streams-Scala, and apologies for the late response from my side.
> > > > I have now created a PR to port Kafka-Streams-Scala to Scala 3 (while
> > > > retaining support for 2.13 and 2.12). Almost no changes to the code
> > > > were required and the tests also pass. Please take a look and let me
> > > > know what you think :-)
> > > > https://github.com/apache/kafka/pull/15338
> > > >
> > > > All the best
> > > > Matthias
> > > >
> > > > Am Do., 1. Feb. 2024 um 16:35 Uhr schrieb Josep Prat
> > > > :
> > > > >
> > > > > Hi,
> > > > >
> > > > > For reference, prior work on this:
> > > > > https://github.com/apache/kafka/pull/11350
> > > > > https://github.com/apache/kafka/pull/11432
> > > > >
> > > > > Best,
> > > > >
> > > > > On Thu, Feb 1, 2024, 15:55 Lucas Brutschy  > > > .invalid>
> > > > > wrote:
> > > > >
> > > > > > Hi Matthiases,
> > > > > >
> > > > > > I know Scala 2 fairly well, so I'd be happy to review changes that
> > add
> > > > > > Scala 3 support. However, as Matthias S. said, it has to be driven
> > by
> > > > > > people who use Scala day-to-day, since I believe most Kafka Streams
> > > > > > committers are working with Java.
> > > > > >
> > > > > > Rewriting the tests to not use EmbeddedKafkaCluster seems like a
> > large
> > > > > > undertaking, so option 1 is the first thing we should explore.
> > > > > >
> > > > > > I don't have any experience with Scala 3 migration topics, but on
> > the
> > > > > > Scala website it says
> > > > > > > The first piece of good news is that the Scala 3 compiler is
> > able to
> > > > > > read the Scala 2.13 Pickle format and thus it can type check code
> > that
> > > > > > depends on modules or libraries compiled with Scala 2.13.
> > > > > > > One notable example is the Scala 2.13 library. We have indeed
> > decided
> > > > > > that the Scala 2.13 library is the official standard library for
> > Scala
> > > > 3.
> > > > > > So wouldn't that mean that we are safe in terms of standard library
> > > > > > upgrades if we use core_2.13 in the tests?
> > > > > >
> > > > > > Cheers,
> > > > > > Lucas
> > > > > >
> > > > > >
> > > > > > On Wed, Jan 31, 2024 at 9:20 PM Matthias J. Sax 
> > > > wrote:
> > > > > > >
> > > > > > > Thanks for raising this. The `kafka-streams-scala` module seems
> > to
> > > > be an
> > > > > > > important feature for Kafka Streams and I am generally in favor
> > of
> > > > your
> > > > > > > proposal to add Scala 3 support. However, I am personally no
> > Scala
> > > > > > > person and it sounds like quite some overhead.
> > > > > > >
> > > > > > > If you are willing to drive and own this initiative happy to
> > support
> > > > you
> > > > > > > to the extend I can.
> > > > > > >
> > > > > > > About the concrete proposal: my understanding is that :core will
> > move
> > > > > > > off Scala long-term (not 100% sure what the timeline is, but new
> > > > modules
> > > > > > > are written in Java only). Thus, down the road the compatibility
> > > > issue
> > > > > > > 

Re: Kafka-Streams-Scala for Scala 3

2024-02-08 Thread Josep Prat
Go ahead and ask for a JIRA and Wiki account (Confluence). Let us know when
your accounts are created and we'll properly set them up so you can create
and assign tickets to you.

Best,

On Thu, Feb 8, 2024 at 11:32 AM Matthias Berndt 
wrote:

> Thanks Josep, I've applied for a JIRA account and addressed your
> review comments.
>
> Am Do., 8. Feb. 2024 um 09:19 Uhr schrieb Josep Prat
> :
> >
> > Hi Matthias,
> >
> > I think for this particular case it would be worth creating a JIRA ticket
> > for this as it's a new "feature".
> > Regarding the change itself, I think we need to clarify how the release
> > process would work. Right now, the script `gradlewAll` is used (which
> > basically runs the build with Scala version 2.12 and 2.13). If I
> understand
> > your changes correctly, we would need to run the build 3 times:
> > - 1 with property scalaVersion 2.12
> > - 1 with scalaVersion 2.13 and streamsScalaVersion 2.13
> > - 1 with scalaVersion 2.13 and streamsScalaVersion 3.1
> >
> > I think we should document this and discuss when to have this feature.
> If I
> > remember correctly from when I tried to update Kafka to Scala 3, the idea
> > was to push this to a Kafka 4.0 version because we didn't want to
> maintain
> > more than 2 Scala versions at the same time. I would encourage if not
> > having a KIP, at least open up a [DISCUSS] thread to clarify some of
> these
> > points.
> >
> > I'll add some feedback on the PR itself regarding the changes.
> >
> > Best,
> >
> > On Thu, Feb 8, 2024 at 1:57 AM Matthias Berndt <
> matthias.ber...@ttmzero.com>
> > wrote:
> >
> > > Hi Matthias J., Hi Lucas, Hi Josep,
> > >
> > > Thank you for your encouraging responses regarding a Scala 3 port of
> > > Kafka-Streams-Scala, and apologies for the late response from my side.
> > > I have now created a PR to port Kafka-Streams-Scala to Scala 3 (while
> > > retaining support for 2.13 and 2.12). Almost no changes to the code
> > > were required and the tests also pass. Please take a look and let me
> > > know what you think :-)
> > > https://github.com/apache/kafka/pull/15338
> > >
> > > All the best
> > > Matthias
> > >
> > > Am Do., 1. Feb. 2024 um 16:35 Uhr schrieb Josep Prat
> > > :
> > > >
> > > > Hi,
> > > >
> > > > For reference, prior work on this:
> > > > https://github.com/apache/kafka/pull/11350
> > > > https://github.com/apache/kafka/pull/11432
> > > >
> > > > Best,
> > > >
> > > > On Thu, Feb 1, 2024, 15:55 Lucas Brutschy  > > .invalid>
> > > > wrote:
> > > >
> > > > > Hi Matthiases,
> > > > >
> > > > > I know Scala 2 fairly well, so I'd be happy to review changes that
> add
> > > > > Scala 3 support. However, as Matthias S. said, it has to be driven
> by
> > > > > people who use Scala day-to-day, since I believe most Kafka Streams
> > > > > committers are working with Java.
> > > > >
> > > > > Rewriting the tests to not use EmbeddedKafkaCluster seems like a
> large
> > > > > undertaking, so option 1 is the first thing we should explore.
> > > > >
> > > > > I don't have any experience with Scala 3 migration topics, but on
> the
> > > > > Scala website it says
> > > > > > The first piece of good news is that the Scala 3 compiler is
> able to
> > > > > read the Scala 2.13 Pickle format and thus it can type check code
> that
> > > > > depends on modules or libraries compiled with Scala 2.13.
> > > > > > One notable example is the Scala 2.13 library. We have indeed
> decided
> > > > > that the Scala 2.13 library is the official standard library for
> Scala
> > > 3.
> > > > > So wouldn't that mean that we are safe in terms of standard library
> > > > > upgrades if we use core_2.13 in the tests?
> > > > >
> > > > > Cheers,
> > > > > Lucas
> > > > >
> > > > >
> > > > > On Wed, Jan 31, 2024 at 9:20 PM Matthias J. Sax 
> > > wrote:
> > > > > >
> > > > > > Thanks for raising this. The `kafka-streams-scala` module seems
> to
> > > be an
> > > > > > important feature for Kafka Streams and I am generally in favor
> of
> > > your
> > > > > > proposal to add Scala 3 support. However, I am personally no
> Scala
> > > > > > person and it sounds like quite some overhead.
> > > > > >
> > > > > > If you are willing to drive and own this initiative happy to
> support
> > > you
> > > > > > to the extend I can.
> > > > > >
> > > > > > About the concrete proposal: my understanding is that :core will
> move
> > > > > > off Scala long-term (not 100% sure what the timeline is, but new
> > > modules
> > > > > > are written in Java only). Thus, down the road the compatibility
> > > issue
> > > > > > would go away naturally, but it's unclear when.
> > > > > >
> > > > > > Thus, if we can test kafak-stream-scala_3 with core_2.13 it
> seems we
> > > > > > could add support for Scala 3 now, taking a risk that it might
> break
> > > in
> > > > > > the future assume that the migration off Scala from core is not
> fast
> > > > > enough.
> > > > > >
> > > > > > For proposal (2), I don't think that it would be easily possible
> for
> > > > > > unit/i

Re: Kafka-Streams-Scala for Scala 3

2024-02-08 Thread Matthias Berndt
Thanks Josep, I've applied for a JIRA account and addressed your
review comments.

Am Do., 8. Feb. 2024 um 09:19 Uhr schrieb Josep Prat
:
>
> Hi Matthias,
>
> I think for this particular case it would be worth creating a JIRA ticket
> for this as it's a new "feature".
> Regarding the change itself, I think we need to clarify how the release
> process would work. Right now, the script `gradlewAll` is used (which
> basically runs the build with Scala version 2.12 and 2.13). If I understand
> your changes correctly, we would need to run the build 3 times:
> - 1 with property scalaVersion 2.12
> - 1 with scalaVersion 2.13 and streamsScalaVersion 2.13
> - 1 with scalaVersion 2.13 and streamsScalaVersion 3.1
>
> I think we should document this and discuss when to have this feature. If I
> remember correctly from when I tried to update Kafka to Scala 3, the idea
> was to push this to a Kafka 4.0 version because we didn't want to maintain
> more than 2 Scala versions at the same time. I would encourage if not
> having a KIP, at least open up a [DISCUSS] thread to clarify some of these
> points.
>
> I'll add some feedback on the PR itself regarding the changes.
>
> Best,
>
> On Thu, Feb 8, 2024 at 1:57 AM Matthias Berndt 
> wrote:
>
> > Hi Matthias J., Hi Lucas, Hi Josep,
> >
> > Thank you for your encouraging responses regarding a Scala 3 port of
> > Kafka-Streams-Scala, and apologies for the late response from my side.
> > I have now created a PR to port Kafka-Streams-Scala to Scala 3 (while
> > retaining support for 2.13 and 2.12). Almost no changes to the code
> > were required and the tests also pass. Please take a look and let me
> > know what you think :-)
> > https://github.com/apache/kafka/pull/15338
> >
> > All the best
> > Matthias
> >
> > Am Do., 1. Feb. 2024 um 16:35 Uhr schrieb Josep Prat
> > :
> > >
> > > Hi,
> > >
> > > For reference, prior work on this:
> > > https://github.com/apache/kafka/pull/11350
> > > https://github.com/apache/kafka/pull/11432
> > >
> > > Best,
> > >
> > > On Thu, Feb 1, 2024, 15:55 Lucas Brutschy  > .invalid>
> > > wrote:
> > >
> > > > Hi Matthiases,
> > > >
> > > > I know Scala 2 fairly well, so I'd be happy to review changes that add
> > > > Scala 3 support. However, as Matthias S. said, it has to be driven by
> > > > people who use Scala day-to-day, since I believe most Kafka Streams
> > > > committers are working with Java.
> > > >
> > > > Rewriting the tests to not use EmbeddedKafkaCluster seems like a large
> > > > undertaking, so option 1 is the first thing we should explore.
> > > >
> > > > I don't have any experience with Scala 3 migration topics, but on the
> > > > Scala website it says
> > > > > The first piece of good news is that the Scala 3 compiler is able to
> > > > read the Scala 2.13 Pickle format and thus it can type check code that
> > > > depends on modules or libraries compiled with Scala 2.13.
> > > > > One notable example is the Scala 2.13 library. We have indeed decided
> > > > that the Scala 2.13 library is the official standard library for Scala
> > 3.
> > > > So wouldn't that mean that we are safe in terms of standard library
> > > > upgrades if we use core_2.13 in the tests?
> > > >
> > > > Cheers,
> > > > Lucas
> > > >
> > > >
> > > > On Wed, Jan 31, 2024 at 9:20 PM Matthias J. Sax 
> > wrote:
> > > > >
> > > > > Thanks for raising this. The `kafka-streams-scala` module seems to
> > be an
> > > > > important feature for Kafka Streams and I am generally in favor of
> > your
> > > > > proposal to add Scala 3 support. However, I am personally no Scala
> > > > > person and it sounds like quite some overhead.
> > > > >
> > > > > If you are willing to drive and own this initiative happy to support
> > you
> > > > > to the extend I can.
> > > > >
> > > > > About the concrete proposal: my understanding is that :core will move
> > > > > off Scala long-term (not 100% sure what the timeline is, but new
> > modules
> > > > > are written in Java only). Thus, down the road the compatibility
> > issue
> > > > > would go away naturally, but it's unclear when.
> > > > >
> > > > > Thus, if we can test kafak-stream-scala_3 with core_2.13 it seems we
> > > > > could add support for Scala 3 now, taking a risk that it might break
> > in
> > > > > the future assume that the migration off Scala from core is not fast
> > > > enough.
> > > > >
> > > > > For proposal (2), I don't think that it would be easily possible for
> > > > > unit/integration tests. We could fall back to system tests though,
> > but
> > > > > they would be much more heavy weight of course.
> > > > >
> > > > > Might be good to hear from others. We might actually also want to do
> > a
> > > > > KIP for this?
> > > > >
> > > > >
> > > > > -Matthias
> > > > >
> > > > > On 1/20/24 10:34 AM, Matthias Berndt wrote:
> > > > > > Hey there,
> > > > > >
> > > > > > I'd like to discuss a Scala 3 port of the kafka-streams-scala
> > library.
> > > > > > Currently, the build system is set up such that kafka-stream

Re: Kafka-Streams-Scala for Scala 3

2024-02-08 Thread Josep Prat
Hi Matthias,

I think for this particular case it would be worth creating a JIRA ticket
for this as it's a new "feature".
Regarding the change itself, I think we need to clarify how the release
process would work. Right now, the script `gradlewAll` is used (which
basically runs the build with Scala version 2.12 and 2.13). If I understand
your changes correctly, we would need to run the build 3 times:
- 1 with property scalaVersion 2.12
- 1 with scalaVersion 2.13 and streamsScalaVersion 2.13
- 1 with scalaVersion 2.13 and streamsScalaVersion 3.1

I think we should document this and discuss when to have this feature. If I
remember correctly from when I tried to update Kafka to Scala 3, the idea
was to push this to a Kafka 4.0 version because we didn't want to maintain
more than 2 Scala versions at the same time. I would encourage if not
having a KIP, at least open up a [DISCUSS] thread to clarify some of these
points.

I'll add some feedback on the PR itself regarding the changes.

Best,

On Thu, Feb 8, 2024 at 1:57 AM Matthias Berndt 
wrote:

> Hi Matthias J., Hi Lucas, Hi Josep,
>
> Thank you for your encouraging responses regarding a Scala 3 port of
> Kafka-Streams-Scala, and apologies for the late response from my side.
> I have now created a PR to port Kafka-Streams-Scala to Scala 3 (while
> retaining support for 2.13 and 2.12). Almost no changes to the code
> were required and the tests also pass. Please take a look and let me
> know what you think :-)
> https://github.com/apache/kafka/pull/15338
>
> All the best
> Matthias
>
> Am Do., 1. Feb. 2024 um 16:35 Uhr schrieb Josep Prat
> :
> >
> > Hi,
> >
> > For reference, prior work on this:
> > https://github.com/apache/kafka/pull/11350
> > https://github.com/apache/kafka/pull/11432
> >
> > Best,
> >
> > On Thu, Feb 1, 2024, 15:55 Lucas Brutschy  .invalid>
> > wrote:
> >
> > > Hi Matthiases,
> > >
> > > I know Scala 2 fairly well, so I'd be happy to review changes that add
> > > Scala 3 support. However, as Matthias S. said, it has to be driven by
> > > people who use Scala day-to-day, since I believe most Kafka Streams
> > > committers are working with Java.
> > >
> > > Rewriting the tests to not use EmbeddedKafkaCluster seems like a large
> > > undertaking, so option 1 is the first thing we should explore.
> > >
> > > I don't have any experience with Scala 3 migration topics, but on the
> > > Scala website it says
> > > > The first piece of good news is that the Scala 3 compiler is able to
> > > read the Scala 2.13 Pickle format and thus it can type check code that
> > > depends on modules or libraries compiled with Scala 2.13.
> > > > One notable example is the Scala 2.13 library. We have indeed decided
> > > that the Scala 2.13 library is the official standard library for Scala
> 3.
> > > So wouldn't that mean that we are safe in terms of standard library
> > > upgrades if we use core_2.13 in the tests?
> > >
> > > Cheers,
> > > Lucas
> > >
> > >
> > > On Wed, Jan 31, 2024 at 9:20 PM Matthias J. Sax 
> wrote:
> > > >
> > > > Thanks for raising this. The `kafka-streams-scala` module seems to
> be an
> > > > important feature for Kafka Streams and I am generally in favor of
> your
> > > > proposal to add Scala 3 support. However, I am personally no Scala
> > > > person and it sounds like quite some overhead.
> > > >
> > > > If you are willing to drive and own this initiative happy to support
> you
> > > > to the extend I can.
> > > >
> > > > About the concrete proposal: my understanding is that :core will move
> > > > off Scala long-term (not 100% sure what the timeline is, but new
> modules
> > > > are written in Java only). Thus, down the road the compatibility
> issue
> > > > would go away naturally, but it's unclear when.
> > > >
> > > > Thus, if we can test kafak-stream-scala_3 with core_2.13 it seems we
> > > > could add support for Scala 3 now, taking a risk that it might break
> in
> > > > the future assume that the migration off Scala from core is not fast
> > > enough.
> > > >
> > > > For proposal (2), I don't think that it would be easily possible for
> > > > unit/integration tests. We could fall back to system tests though,
> but
> > > > they would be much more heavy weight of course.
> > > >
> > > > Might be good to hear from others. We might actually also want to do
> a
> > > > KIP for this?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 1/20/24 10:34 AM, Matthias Berndt wrote:
> > > > > Hey there,
> > > > >
> > > > > I'd like to discuss a Scala 3 port of the kafka-streams-scala
> library.
> > > > > Currently, the build system is set up such that kafka-streams-scala
> > > > > and core (i. e. kafka itself) are compiled with the same Scala
> > > > > compiler versions. This is not an optimal situation because it
> means
> > > > > that a Scala 3 release of kafka-streams-scala cannot happen
> > > > > independently of kafka itself. I think this should be changed
> > > > >
> > > > > The production codebase of scala-streams-kafka actually co

Re: Kafka-Streams-Scala for Scala 3

2024-02-07 Thread Matthias Berndt
Hi Matthias J., Hi Lucas, Hi Josep,

Thank you for your encouraging responses regarding a Scala 3 port of
Kafka-Streams-Scala, and apologies for the late response from my side.
I have now created a PR to port Kafka-Streams-Scala to Scala 3 (while
retaining support for 2.13 and 2.12). Almost no changes to the code
were required and the tests also pass. Please take a look and let me
know what you think :-)
https://github.com/apache/kafka/pull/15338

All the best
Matthias

Am Do., 1. Feb. 2024 um 16:35 Uhr schrieb Josep Prat
:
>
> Hi,
>
> For reference, prior work on this:
> https://github.com/apache/kafka/pull/11350
> https://github.com/apache/kafka/pull/11432
>
> Best,
>
> On Thu, Feb 1, 2024, 15:55 Lucas Brutschy 
> wrote:
>
> > Hi Matthiases,
> >
> > I know Scala 2 fairly well, so I'd be happy to review changes that add
> > Scala 3 support. However, as Matthias S. said, it has to be driven by
> > people who use Scala day-to-day, since I believe most Kafka Streams
> > committers are working with Java.
> >
> > Rewriting the tests to not use EmbeddedKafkaCluster seems like a large
> > undertaking, so option 1 is the first thing we should explore.
> >
> > I don't have any experience with Scala 3 migration topics, but on the
> > Scala website it says
> > > The first piece of good news is that the Scala 3 compiler is able to
> > read the Scala 2.13 Pickle format and thus it can type check code that
> > depends on modules or libraries compiled with Scala 2.13.
> > > One notable example is the Scala 2.13 library. We have indeed decided
> > that the Scala 2.13 library is the official standard library for Scala 3.
> > So wouldn't that mean that we are safe in terms of standard library
> > upgrades if we use core_2.13 in the tests?
> >
> > Cheers,
> > Lucas
> >
> >
> > On Wed, Jan 31, 2024 at 9:20 PM Matthias J. Sax  wrote:
> > >
> > > Thanks for raising this. The `kafka-streams-scala` module seems to be an
> > > important feature for Kafka Streams and I am generally in favor of your
> > > proposal to add Scala 3 support. However, I am personally no Scala
> > > person and it sounds like quite some overhead.
> > >
> > > If you are willing to drive and own this initiative happy to support you
> > > to the extend I can.
> > >
> > > About the concrete proposal: my understanding is that :core will move
> > > off Scala long-term (not 100% sure what the timeline is, but new modules
> > > are written in Java only). Thus, down the road the compatibility issue
> > > would go away naturally, but it's unclear when.
> > >
> > > Thus, if we can test kafak-stream-scala_3 with core_2.13 it seems we
> > > could add support for Scala 3 now, taking a risk that it might break in
> > > the future assume that the migration off Scala from core is not fast
> > enough.
> > >
> > > For proposal (2), I don't think that it would be easily possible for
> > > unit/integration tests. We could fall back to system tests though, but
> > > they would be much more heavy weight of course.
> > >
> > > Might be good to hear from others. We might actually also want to do a
> > > KIP for this?
> > >
> > >
> > > -Matthias
> > >
> > > On 1/20/24 10:34 AM, Matthias Berndt wrote:
> > > > Hey there,
> > > >
> > > > I'd like to discuss a Scala 3 port of the kafka-streams-scala library.
> > > > Currently, the build system is set up such that kafka-streams-scala
> > > > and core (i. e. kafka itself) are compiled with the same Scala
> > > > compiler versions. This is not an optimal situation because it means
> > > > that a Scala 3 release of kafka-streams-scala cannot happen
> > > > independently of kafka itself. I think this should be changed
> > > >
> > > > The production codebase of scala-streams-kafka actually compiles just
> > > > fine on Scala 3.3.1 with two lines of trivial syntax changes. The
> > > > problem is with the tests. These use the `EmbeddedKafkaCluster` class,
> > > > which means that kafka is pulled into the classpath, potentially
> > > > leading to binary compatibility issues.
> > > > I can see several approaches to fixing this:
> > > >
> > > > 1. Run the kafka-streams-scala tests using the compatible version of
> > > > :core if one is available. Currently, this means that everything can
> > > > be tested (test kafka-streams-scala_2.12 using core_2.12,
> > > > kafka-streams-scala_2.13 using core_2.13 and kafka-streams-scala_3
> > > > using core_2.13, as these should be compatible), but when a new
> > > > scala-library version is released that is no longer compatible with
> > > > 2.13, we won't be able to test that.
> > > > 2. Rewrite the tests to run without EmbeddedKafkaCluster, instead
> > > > running the test cluster in a separate JVM or perhaps even a
> > > > container.
> > > >
> > > > I'd be willing to get my hands dirty working on this, but before I
> > > > start I'd like to get some feedback from the Kafka team regarding the
> > > > approaches outlined above.
> > > >
> > > > All the best
> > > > Matthias Berndt
> >
> KJosep Prat
> Open So

Re: Kafka-Streams-Scala for Scala 3

2024-02-01 Thread Josep Prat
Hi,

For reference, prior work on this:
https://github.com/apache/kafka/pull/11350
https://github.com/apache/kafka/pull/11432

Best,

On Thu, Feb 1, 2024, 15:55 Lucas Brutschy 
wrote:

> Hi Matthiases,
>
> I know Scala 2 fairly well, so I'd be happy to review changes that add
> Scala 3 support. However, as Matthias S. said, it has to be driven by
> people who use Scala day-to-day, since I believe most Kafka Streams
> committers are working with Java.
>
> Rewriting the tests to not use EmbeddedKafkaCluster seems like a large
> undertaking, so option 1 is the first thing we should explore.
>
> I don't have any experience with Scala 3 migration topics, but on the
> Scala website it says
> > The first piece of good news is that the Scala 3 compiler is able to
> read the Scala 2.13 Pickle format and thus it can type check code that
> depends on modules or libraries compiled with Scala 2.13.
> > One notable example is the Scala 2.13 library. We have indeed decided
> that the Scala 2.13 library is the official standard library for Scala 3.
> So wouldn't that mean that we are safe in terms of standard library
> upgrades if we use core_2.13 in the tests?
>
> Cheers,
> Lucas
>
>
> On Wed, Jan 31, 2024 at 9:20 PM Matthias J. Sax  wrote:
> >
> > Thanks for raising this. The `kafka-streams-scala` module seems to be an
> > important feature for Kafka Streams and I am generally in favor of your
> > proposal to add Scala 3 support. However, I am personally no Scala
> > person and it sounds like quite some overhead.
> >
> > If you are willing to drive and own this initiative happy to support you
> > to the extend I can.
> >
> > About the concrete proposal: my understanding is that :core will move
> > off Scala long-term (not 100% sure what the timeline is, but new modules
> > are written in Java only). Thus, down the road the compatibility issue
> > would go away naturally, but it's unclear when.
> >
> > Thus, if we can test kafak-stream-scala_3 with core_2.13 it seems we
> > could add support for Scala 3 now, taking a risk that it might break in
> > the future assume that the migration off Scala from core is not fast
> enough.
> >
> > For proposal (2), I don't think that it would be easily possible for
> > unit/integration tests. We could fall back to system tests though, but
> > they would be much more heavy weight of course.
> >
> > Might be good to hear from others. We might actually also want to do a
> > KIP for this?
> >
> >
> > -Matthias
> >
> > On 1/20/24 10:34 AM, Matthias Berndt wrote:
> > > Hey there,
> > >
> > > I'd like to discuss a Scala 3 port of the kafka-streams-scala library.
> > > Currently, the build system is set up such that kafka-streams-scala
> > > and core (i. e. kafka itself) are compiled with the same Scala
> > > compiler versions. This is not an optimal situation because it means
> > > that a Scala 3 release of kafka-streams-scala cannot happen
> > > independently of kafka itself. I think this should be changed
> > >
> > > The production codebase of scala-streams-kafka actually compiles just
> > > fine on Scala 3.3.1 with two lines of trivial syntax changes. The
> > > problem is with the tests. These use the `EmbeddedKafkaCluster` class,
> > > which means that kafka is pulled into the classpath, potentially
> > > leading to binary compatibility issues.
> > > I can see several approaches to fixing this:
> > >
> > > 1. Run the kafka-streams-scala tests using the compatible version of
> > > :core if one is available. Currently, this means that everything can
> > > be tested (test kafka-streams-scala_2.12 using core_2.12,
> > > kafka-streams-scala_2.13 using core_2.13 and kafka-streams-scala_3
> > > using core_2.13, as these should be compatible), but when a new
> > > scala-library version is released that is no longer compatible with
> > > 2.13, we won't be able to test that.
> > > 2. Rewrite the tests to run without EmbeddedKafkaCluster, instead
> > > running the test cluster in a separate JVM or perhaps even a
> > > container.
> > >
> > > I'd be willing to get my hands dirty working on this, but before I
> > > start I'd like to get some feedback from the Kafka team regarding the
> > > approaches outlined above.
> > >
> > > All the best
> > > Matthias Berndt
>
KJosep Prat
Open Source Engineering Director, aivenjosep.p...@aiven.io   |
+491715557497 | aiven.io
Aiven Deutschland GmbH
Alexanderufer 3-7, 10117 Berlin
Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
Amtsgericht Charlottenburg, HRB 209739 B


Re: Kafka-Streams-Scala for Scala 3

2024-02-01 Thread Lucas Brutschy
Hi Matthiases,

I know Scala 2 fairly well, so I'd be happy to review changes that add
Scala 3 support. However, as Matthias S. said, it has to be driven by
people who use Scala day-to-day, since I believe most Kafka Streams
committers are working with Java.

Rewriting the tests to not use EmbeddedKafkaCluster seems like a large
undertaking, so option 1 is the first thing we should explore.

I don't have any experience with Scala 3 migration topics, but on the
Scala website it says
> The first piece of good news is that the Scala 3 compiler is able to read the 
> Scala 2.13 Pickle format and thus it can type check code that depends on 
> modules or libraries compiled with Scala 2.13.
> One notable example is the Scala 2.13 library. We have indeed decided that 
> the Scala 2.13 library is the official standard library for Scala 3.
So wouldn't that mean that we are safe in terms of standard library
upgrades if we use core_2.13 in the tests?

Cheers,
Lucas


On Wed, Jan 31, 2024 at 9:20 PM Matthias J. Sax  wrote:
>
> Thanks for raising this. The `kafka-streams-scala` module seems to be an
> important feature for Kafka Streams and I am generally in favor of your
> proposal to add Scala 3 support. However, I am personally no Scala
> person and it sounds like quite some overhead.
>
> If you are willing to drive and own this initiative happy to support you
> to the extend I can.
>
> About the concrete proposal: my understanding is that :core will move
> off Scala long-term (not 100% sure what the timeline is, but new modules
> are written in Java only). Thus, down the road the compatibility issue
> would go away naturally, but it's unclear when.
>
> Thus, if we can test kafak-stream-scala_3 with core_2.13 it seems we
> could add support for Scala 3 now, taking a risk that it might break in
> the future assume that the migration off Scala from core is not fast enough.
>
> For proposal (2), I don't think that it would be easily possible for
> unit/integration tests. We could fall back to system tests though, but
> they would be much more heavy weight of course.
>
> Might be good to hear from others. We might actually also want to do a
> KIP for this?
>
>
> -Matthias
>
> On 1/20/24 10:34 AM, Matthias Berndt wrote:
> > Hey there,
> >
> > I'd like to discuss a Scala 3 port of the kafka-streams-scala library.
> > Currently, the build system is set up such that kafka-streams-scala
> > and core (i. e. kafka itself) are compiled with the same Scala
> > compiler versions. This is not an optimal situation because it means
> > that a Scala 3 release of kafka-streams-scala cannot happen
> > independently of kafka itself. I think this should be changed
> >
> > The production codebase of scala-streams-kafka actually compiles just
> > fine on Scala 3.3.1 with two lines of trivial syntax changes. The
> > problem is with the tests. These use the `EmbeddedKafkaCluster` class,
> > which means that kafka is pulled into the classpath, potentially
> > leading to binary compatibility issues.
> > I can see several approaches to fixing this:
> >
> > 1. Run the kafka-streams-scala tests using the compatible version of
> > :core if one is available. Currently, this means that everything can
> > be tested (test kafka-streams-scala_2.12 using core_2.12,
> > kafka-streams-scala_2.13 using core_2.13 and kafka-streams-scala_3
> > using core_2.13, as these should be compatible), but when a new
> > scala-library version is released that is no longer compatible with
> > 2.13, we won't be able to test that.
> > 2. Rewrite the tests to run without EmbeddedKafkaCluster, instead
> > running the test cluster in a separate JVM or perhaps even a
> > container.
> >
> > I'd be willing to get my hands dirty working on this, but before I
> > start I'd like to get some feedback from the Kafka team regarding the
> > approaches outlined above.
> >
> > All the best
> > Matthias Berndt


Re: Kafka-Streams-Scala for Scala 3

2024-01-31 Thread Matthias J. Sax
Thanks for raising this. The `kafka-streams-scala` module seems to be an 
important feature for Kafka Streams and I am generally in favor of your 
proposal to add Scala 3 support. However, I am personally no Scala 
person and it sounds like quite some overhead.


If you are willing to drive and own this initiative happy to support you 
to the extend I can.


About the concrete proposal: my understanding is that :core will move 
off Scala long-term (not 100% sure what the timeline is, but new modules 
are written in Java only). Thus, down the road the compatibility issue 
would go away naturally, but it's unclear when.


Thus, if we can test kafak-stream-scala_3 with core_2.13 it seems we 
could add support for Scala 3 now, taking a risk that it might break in 
the future assume that the migration off Scala from core is not fast enough.


For proposal (2), I don't think that it would be easily possible for 
unit/integration tests. We could fall back to system tests though, but 
they would be much more heavy weight of course.


Might be good to hear from others. We might actually also want to do a 
KIP for this?



-Matthias

On 1/20/24 10:34 AM, Matthias Berndt wrote:

Hey there,

I'd like to discuss a Scala 3 port of the kafka-streams-scala library.
Currently, the build system is set up such that kafka-streams-scala
and core (i. e. kafka itself) are compiled with the same Scala
compiler versions. This is not an optimal situation because it means
that a Scala 3 release of kafka-streams-scala cannot happen
independently of kafka itself. I think this should be changed

The production codebase of scala-streams-kafka actually compiles just
fine on Scala 3.3.1 with two lines of trivial syntax changes. The
problem is with the tests. These use the `EmbeddedKafkaCluster` class,
which means that kafka is pulled into the classpath, potentially
leading to binary compatibility issues.
I can see several approaches to fixing this:

1. Run the kafka-streams-scala tests using the compatible version of
:core if one is available. Currently, this means that everything can
be tested (test kafka-streams-scala_2.12 using core_2.12,
kafka-streams-scala_2.13 using core_2.13 and kafka-streams-scala_3
using core_2.13, as these should be compatible), but when a new
scala-library version is released that is no longer compatible with
2.13, we won't be able to test that.
2. Rewrite the tests to run without EmbeddedKafkaCluster, instead
running the test cluster in a separate JVM or perhaps even a
container.

I'd be willing to get my hands dirty working on this, but before I
start I'd like to get some feedback from the Kafka team regarding the
approaches outlined above.

All the best
Matthias Berndt


Re: [Kafka Streams], Processor API for KTable and KGroupedStream

2024-01-12 Thread Matthias J. Sax
`KGroupedStream` is just an "intermediate representation" to get a 
better flow in the DSL. It's not a "top level" abstraction like 
KStream/KTable.


For `KTable` there is `transformValue()` -- there is no `transform()` 
because keying must be preserved -- if you want to change the keying you 
 need to use `KTable#groupBy()` (data needs to be repartitioned if you 
change the key).


HTH.

-Matthias

On 1/12/24 3:09 AM, Igor Maznitsa wrote:

Hello

Is there any way in Kafka Streams API to have processors for KTable and 
KGroupedStream like KStream#transform? How to provide a complex 
processor for KTable or KGroupedStream which could provide way to not 
downstream events for some business logic?





Re: Kafka Streams Issue

2022-04-01 Thread John Roesler
Hi Daan,

First of all, it does sound like that is a correct
implementation of QueryableStoreProvider. Kudos for taking
that on; the complexity of that API was one of my top
motivations for replacing it with IQv2!
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2
)

To answer your question directly, no "activeHost" just means
the host that currently has the "activeTask" for the desired
store.

I suspect that this is either a subtle and rare edge case in
how the metadata gets updated, or it's just a simple race
condition between the query and a rebalance in the cluster,
which is a fact of life in any distributed database.

If you are able to reproduce it and send us the logs, we
should be able to tell which is which.

In particular, we'd need to see thee things in the logs:
1. The logs for the rebalances and assignments (which are on
by default)
2. The log of when you check the metadata and what the
result it
3. The log of when the query tries to run on the
"activeHost" and what it finds there (that the task is only
a standby)

One other possibility worth considering is whether the
queryMetadataForKey is producing the correct partition. What
it does is run the provided key through the provided
serializer and then run the serialized key though the
default partitioner. If your actual data isn't partitioned
the same way, then queryMetadataForKey might be effectively
selecting a random host, which sometimes happens to host the
active task and other times does not? Kind of a long shot,
but I just wanted to put it out there.

Thanks,
-John


On Mon, 2022-03-28 at 13:48 +, Daan Gertis wrote:
> Hi All,
> 
> We are experiencing some weird behaviour with our interactive query service 
> implementation.
> This is the flow we’ve implemented:
> 
> 
>   1.  kafkaStreams.queryMetadataForKey(store, key, serializer) returns for 
> activeHost HostInfo{host='localhost', port=8562}, and standbyHosts [] for the 
> store and partition where the key would reside. We are not interested in 
> standby hosts. Luckily, we have an active host which we can call.
>   2.  We make an HTTP call to host localhost:8562, asking for the key there.
>   3.  Inside the 8562 host, we retrieve the store by calling 
> kafkaStreams.store(parameters), using parameters with staleStores set to 
> false.
>   4.  We call kafkaStreams.state().equals(RUNNING) to make sure we’re in the 
> RUNNING state.
>   5.  Now we call store.get(key) in order to retrieve the key from the store, 
> if it has been stored there.
>   6.  The get method on our store implementation calls the 
> storeProvider.stores(storeName, storeType) method to iterate over all the 
> stores available on the host.
>   7.  The storeProvider is a WrappingStoreProvider, which calls 
> storeProvider.stores(storeQueryParameters) for each 
> StreamThreadStateStoreProvider it wraps (just one in our case).
>   8.  As the logic inside that stores method finds that the StreamThread is 
> in the RUNNING state, it retrieves the tasks based on 
> storeQueryParams.staleStoresEnabled() ? streamThread.allTasks().values() : 
> streamThread.activeTasks(), which evaluates to false since we set staleStores 
> to false in the params.
>   9.  To our surprise, the streamThread.activeTasks() method returns an empty 
> ArrayList, while the streamThread.allTasks().values() returns one StandbyTask 
> for the store we’re looking for.
>   10. As there appear to be no active tasks on this host for this store, we 
> return the fabled “The state store, " + storeName + ", may have migrated to 
> another instance.” InvalidStateStoreException.
> 
> This flow is quite tricky as the queryMetadataForKey returns an active host, 
> which turns out to only have a standby task once queried.
> I have executed the queryMetadataForKey method on the active host as well, 
> once before calling kafkaStreams.store in step 3, and another time between 
> step 4 and 5. Each time the metadata returns the same, the host we’re on at 
> that moment is the active host.
> 
> Could it be there is a difference between activeHost and activeTask?
> 
> For those also on the confluent community slack might recognize this message 
> as it has been posted there by our CTO as well.
> 
> Cheers,
> D.



Re: Kafka Streams: Dynamic Topic Routing & Nonexistent Topics

2020-07-08 Thread Rhys Anthony McCaig
Guozhang - thank you for your thoughts.

You are right - this is more about the producer client than the streams
client.

caching the metadata outside producer, e.g. in an admin client would not
> be a perfect solution since in either way your metadata cache inside the
> producer or inside the admin client would not guarantee to be always up to
> date


It's not perfect, but fortunately in my case it's "good enough" as the main
concern is not letting poorly behaved clients hold up the processing
because they forgot to set up the topics their records should be sent to.

letting the send() call to fail with an UnknownTopicOrPartitionError and
> push the burden on the caller to decide what to do (either wait and retry,
> or give up and stop the world etc) may work, but that requires modifying
> the interface semantics, or at least adding an overloaded function of
> "send()". Maybe worth discussing in a KIP.


The more I think about it, the more I like the idea of differentiating
between a metadata refresh timeout and the case where the metadata was able
to be refreshed yet still didn't contain the topic (or partition). I'll
take a bit more of a look at the existing implementation and try to find
some time to write a KIP for this - as you pointed out this modifies the
interface semantics, so it would need to be an additive and opt in change.

since max.block is a
global config it may also affect other blocking calls like txn-related ones
as well.


Yes that was my concern with this approach as well and hence why i think
the admin client workaround is my best approach at the moment.

Cheers,
Rhys

On Sat, Jul 4, 2020 at 8:36 PM Guozhang Wang  wrote:

> Hello,
>
> Thanks for reaching out to the community for this. I think (maybe you've
> also suggested) it is rather an observation on producer client than on
> streams client. Generally speaking we want to know if we can fail fast if
> the metadata cannot be found in producer.send() call. And here are my
> thoughts:
>
> 1) caching the metadata outside producer, e.g. in an admin client would not
> be a perfect solution since in either way your metadata cache inside the
> producer or inside the admin client would not guarantee to be always up to
> date: e.g. maybe you've decided to fail the record to send since it was not
> in the cache, but one second right after it the metadata gets refreshed and
> contains that topic.
>
> 2) letting the send() call to fail with an UnknownTopicOrPartitionError and
> push the burden on the caller to decide what to do (either wait and retry,
> or give up and stop the world etc) may work, but that requires modifying
> the interface semantics, or at least adding an overloaded function of
> "send()". Maybe worth discussing in a KIP.
>
> 3) for your specific case, if you believe the metadata should be static and
> not changed (i.e. you assume all topics should be pre-created and none
> would be created later), then I think setting max.block to a smaller value
> and just catch TimeoutException is fine since for send() itself, the
> max.block is only used for metadata refresh and buffer allocation when it
> is not sufficient, and the latter should be rare case assuming you set the
> buffer.size to be reasonably large. But note that since max.block is a
> global config it may also affect other blocking calls like txn-related ones
> as well.
>
>
> On Wed, Jul 1, 2020 at 6:10 PM Rhys Anthony McCaig 
> wrote:
>
> > Hi All,
> >
> > I have been recently working on a streams application that uses a
> > TopicNameExtractor to dynamically route records based on the payload.
> This
> > streams application is used by various other applications, and
> occasionally
> > these other applications request for a record to be sent to a
> non-existent
> > topic - rather than this topic be created, the message should be logged
> and
> > dropped.
> >
> > Unfortunately, I don't seem to have found a good way to implement this
> > behaviour in a reliable way: I originally hoped to be able to catch these
> > scenarios in a ProductionExceptionHandler by catching an
> > UnknownTopicOrPartitionError, however the current producer behaviour is
> to
> > wait for max.block.ms in waitOnMetadata() for partitions to be returned
> > for
> > the topic before throwing a TimeoutException. If after refreshing
> metadata,
> > there are still no partitions for the requested topic, it will continue
> to
> > request an update until the timeout is reached:  (
> >
> >
> https://github.com/apache/kafka/blob/b8a99be7847c61d7792689b71fda5b283f8340a8/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1051
> > )
> > For my use case, there are two challenges here:
> > 1. ProductionExceptionHandler must catch TimeoutException and inspect the
> > message to determine that the exception was caused by not finding the
> topic
> > in the metadata
> > 2. The streams task blocks (as expected) while the producer is fetching
> > metadata, holding up processing of other records

Re: Kafka Streams: Dynamic Topic Routing & Nonexistent Topics

2020-07-04 Thread Guozhang Wang
Hello,

Thanks for reaching out to the community for this. I think (maybe you've
also suggested) it is rather an observation on producer client than on
streams client. Generally speaking we want to know if we can fail fast if
the metadata cannot be found in producer.send() call. And here are my
thoughts:

1) caching the metadata outside producer, e.g. in an admin client would not
be a perfect solution since in either way your metadata cache inside the
producer or inside the admin client would not guarantee to be always up to
date: e.g. maybe you've decided to fail the record to send since it was not
in the cache, but one second right after it the metadata gets refreshed and
contains that topic.

2) letting the send() call to fail with an UnknownTopicOrPartitionError and
push the burden on the caller to decide what to do (either wait and retry,
or give up and stop the world etc) may work, but that requires modifying
the interface semantics, or at least adding an overloaded function of
"send()". Maybe worth discussing in a KIP.

3) for your specific case, if you believe the metadata should be static and
not changed (i.e. you assume all topics should be pre-created and none
would be created later), then I think setting max.block to a smaller value
and just catch TimeoutException is fine since for send() itself, the
max.block is only used for metadata refresh and buffer allocation when it
is not sufficient, and the latter should be rare case assuming you set the
buffer.size to be reasonably large. But note that since max.block is a
global config it may also affect other blocking calls like txn-related ones
as well.


On Wed, Jul 1, 2020 at 6:10 PM Rhys Anthony McCaig  wrote:

> Hi All,
>
> I have been recently working on a streams application that uses a
> TopicNameExtractor to dynamically route records based on the payload. This
> streams application is used by various other applications, and occasionally
> these other applications request for a record to be sent to a non-existent
> topic - rather than this topic be created, the message should be logged and
> dropped.
>
> Unfortunately, I don't seem to have found a good way to implement this
> behaviour in a reliable way: I originally hoped to be able to catch these
> scenarios in a ProductionExceptionHandler by catching an
> UnknownTopicOrPartitionError, however the current producer behaviour is to
> wait for max.block.ms in waitOnMetadata() for partitions to be returned
> for
> the topic before throwing a TimeoutException. If after refreshing metadata,
> there are still no partitions for the requested topic, it will continue to
> request an update until the timeout is reached:  (
>
> https://github.com/apache/kafka/blob/b8a99be7847c61d7792689b71fda5b283f8340a8/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1051
> )
> For my use case, there are two challenges here:
> 1. ProductionExceptionHandler must catch TimeoutException and inspect the
> message to determine that the exception was caused by not finding the topic
> in the metadata
> 2. The streams task blocks (as expected) while the producer is fetching
> metadata, holding up processing of other records, until the timeout
> exception is thrown.
>
> Rather than accept the stream blocking in this scenario, my current
> thinking is to use AdminClient to keep a cache of existing/nonexisting
> topics periodically updated and filter based on this - however i can't stop
> thinking that this feels clunky, given the producer maintains its own cache
> of recently accessed topics/partitions.
>
> Would it make sense to enhance KafkaProducer to:
> - Optionally fail fast when the first metadata refresh does not return the
> requested topic, or partition count? (And maybe even optionally cache
> this?)
> - Differentiate between a TimeoutException and
> UnknownTopicOrPartitionError?
>
> My understanding of the internals isn't great - I'm not clear on the reason
> to continue to request metadata updates after getting a new version - is
> there a possible issue with getting stale metadata from brokers?
>
> Looking forward to your thoughts!
>


-- 
-- Guozhang


Re: kafka streams application design on state stores

2019-10-07 Thread Matthias J. Sax
I am not sure if I understand your scenario. What do you exactly mean by
"back ground job" and by "sweeps the records in the store using range
calls" ?

As you noted, only the `Processor` that have access to the store should
write/delete anything from it. You should not try to modify the store
from "outside" (and this would only be possible if you break up the
public APIs anyway...)

Maybe it would help to understand in more detail what you try to achieve
to give better advice?


-Matthias


On 9/26/19 9:45 AM, ChienHsing Wu wrote:
> Hi,
> 
> I am using Kafka Stream's RocksDB state stores in an application and the is a 
> back ground job that sweeps the records in the store using range calls. 
> During processing, we need to ensure that a record is not included in the 
> next range call when that background jobs runs next time. I understand that 
> exposed state store interface to the application is read only and the main 
> reason is to ensure the only way to change records inside RocksDB is from the 
> stream topology. However, there is no way to guarantee stream processing of 
> the removal of the record occurs before that background job runs again.
> 
> Only way I can think of is to remove that record in RocksDB directly.
> 
> I would like to hear any suggestions about
> 
> 
>   1.  If it's OK to remove records in RocksDB out side of the stream topology
>   2.  Any other suggestions to address my goal
> 
> Thanks in advance for your valuable inputs.
> 
> ChienHsing Wu
> 



signature.asc
Description: OpenPGP digital signature


Re: Kafka streams rebalancing issue

2019-06-17 Thread Matthias J. Sax
> This leads to lockexception in
>> consumer2 and consumer2 remains in livelock to create state directories for
>> those two partitons.

There is a fix for a `LockException` during rebalance in 0.11.0.1:
https://issues.apache.org/jira/browse/KAFKA-5167

Maybe upgrading helps? Note, that you can upgrade Kafka Streams
independent of your brokers. Btw: I would recommend to upgrade to at
least to 0.11.0.3 what is the latest bug-fix release for 0.11.0; in
general, if there is a bug fix release, its recommended to upgrade.

Besides bug-fix release, I would recommend to upgrade to newer version
anyway (including broker if possible) as 0.11.0 is already 2 years old...



-Matthias


On 6/15/19 12:20 AM, Aadhil RF wrote:
> Hi All,
> 
>  We have two consumers in a consumer group subscribed to the topic.
> Both the consumers are in different servers. The topic consists of 11
> partitions and 1 replication. Normally, 5 partitions are consumed in
> consumer 1 and remaining in consumer 2. Whenever there is a connection
> glitch between consumers and the coordinator, the rebalance procedure is
> running on both consumers. During this procedure, all the 11 partitions are
> assigned to consumer1 and two of the partitions (which are assigned to
> consumer1) are assigned to consumer2. This leads to lockexception in
> consumer2 and consumer2 remains in livelock to create state directories for
> those two partitons.
> 
> Kafka version: 0.11.0.0
> Zookeeper: 3.4.10
> 



signature.asc
Description: OpenPGP digital signature


Re: Kafka streams issue

2019-06-14 Thread Matthias J. Sax
It is correct, that Kafka Streams does not create input/output/through()
topics. You should create those topics upfront.

> We've *set auto.create.topics.enable=true *in our kafka
>> server properties file and we'd expect that the topics get created, but the
>> consumer goes into SHUTDOWN mode and eventually gets removed as a consumer
>> from kafka. Is this normal behavior?

If you set the config to `true`, the broker would auto-create the
corresponding topic. Hence, it's recommended to disable auto topic creation.

> and we'd expect that the topics get created,

Why? If you enable auto topic create, you should expect that the topics
are auto created. Or did you mean "false" instead of "true"?


Kafka Streams would not get any error if input topics are missing. IIRC
correctly, there should be an error if Kafka Streams tries to write into
an non-existing topic though. However, if both input and output topics
are missing, no data would be processed, and thus no write is attempted
and thus Kafka Streams would not shutdown.


-Matthias

On 6/12/19 5:13 PM, Brian Putt wrote:
> After adding more logging details, we found that we hadn't created topics
> yet for the consumer & producer for the streams application. We've added a
> check when starting up to verify that the topics exist, otherwise we exit
> our app. We're not dynamically creating topics and we want to create them
> upfront as we want to specify replication-factor and # of partitions as the
> defaults won't suffice.
> 
> If I understand correctly, streams should've automagically created topics
> that get consumed, but shouldn't create topics for the producer portion of
> the streams lib. We've *set auto.create.topics.enable=true *in our kafka
> server properties file and we'd expect that the topics get created, but the
> consumer goes into SHUTDOWN mode and eventually gets removed as a consumer
> from kafka. Is this normal behavior?
> 
> On Wed, Jun 12, 2019 at 3:44 AM Matthias J. Sax 
> wrote:
> 
>> Does the application transit to RUNNING state? Also check
>> `KafkaStreams#localThreadsMetadata()` what tasks are assigned?
>>
>> You might also enable DEBUG logs for
>> `org.apache.kafka.clients.consumer.**` classes to see if the consumer
>> sends fetch request to the broker.
>>
>>
>> -Matthias
>>
>> On 6/11/19 7:03 PM, Brian Putt wrote:
>>> The application just hangs (we let it sit for ~1 hour, small dataset as
>>> we're testing), we can restart it listening to 1 of the 3 topics we start
>>> it with and it chugs along, no problem. The same code is executed as
>>> separate application.ids listening to other topics without any issues.
>>> We'll try to increase our logging as nothing is currently being shown in
>>> the logs. Guessing we have our level set to WARN.
>>>
>>> Will certainly share updates as we figure it out.
>>>
>>> Thanks!
>>>
>>> On Tue, Jun 11, 2019 at 6:07 PM Matthias J. Sax 
>>> wrote:
>>>
 What do you exactly observe?

  - Does the application rebalance correctly?
  - Does it start processing?
  - Anything in the logs about the status of the application?


 -Matthias

 On 6/10/19 6:19 PM, Brian Putt wrote:
> Hello,
>
> I'm working with the kafka streams api and am running into issues
>> where I
> subscribe to multiple topics and the consumer just hangs. It has a
>> unique
> application.id and I can see in kafka that the consumer group has been
> created, but when I describe the group, I'll get: consumer group X has
>> no
> active members
>
> The interesting thing is that this works when the topics list only
 contains
> 1 topic. I'm not interested in other answers where we create multiple
> sources, ie: source1 = builder.stream("topic1") and source2 =
> builder.stream("topic2") as the interface for StreamsBuilder.stream
 supports
> an array of topics.
>
> I've been able to subscribe to multiple topics before, I just can't
> replicate how we've done this. (This code is running in a different
> environment and working as expected, so not sure if it's a timing issue
 or
> something else)
>
> List topics = Arrays.asList("topic1", "topic2");
>
> StreamsBuilder builder = new StreamsBuilder();
> KStream source = builder.stream(topics);
>
> source
> .transformValues(...)
> .map(key, value) -> ...)
> .to((key, value, record) -> ...);
>
> new KafkaStreams(builder.build(), props).start();
>
> This question has been posted on stackoverflow in case you want to
> answer there:

>> https://stackoverflow.com/questions/56535113/kafka-streams-listening-to-multiple-topics-hangs
>


>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Kafka streams issue

2019-06-12 Thread Brian Putt
After adding more logging details, we found that we hadn't created topics
yet for the consumer & producer for the streams application. We've added a
check when starting up to verify that the topics exist, otherwise we exit
our app. We're not dynamically creating topics and we want to create them
upfront as we want to specify replication-factor and # of partitions as the
defaults won't suffice.

If I understand correctly, streams should've automagically created topics
that get consumed, but shouldn't create topics for the producer portion of
the streams lib. We've *set auto.create.topics.enable=true *in our kafka
server properties file and we'd expect that the topics get created, but the
consumer goes into SHUTDOWN mode and eventually gets removed as a consumer
from kafka. Is this normal behavior?

On Wed, Jun 12, 2019 at 3:44 AM Matthias J. Sax 
wrote:

> Does the application transit to RUNNING state? Also check
> `KafkaStreams#localThreadsMetadata()` what tasks are assigned?
>
> You might also enable DEBUG logs for
> `org.apache.kafka.clients.consumer.**` classes to see if the consumer
> sends fetch request to the broker.
>
>
> -Matthias
>
> On 6/11/19 7:03 PM, Brian Putt wrote:
> > The application just hangs (we let it sit for ~1 hour, small dataset as
> > we're testing), we can restart it listening to 1 of the 3 topics we start
> > it with and it chugs along, no problem. The same code is executed as
> > separate application.ids listening to other topics without any issues.
> > We'll try to increase our logging as nothing is currently being shown in
> > the logs. Guessing we have our level set to WARN.
> >
> > Will certainly share updates as we figure it out.
> >
> > Thanks!
> >
> > On Tue, Jun 11, 2019 at 6:07 PM Matthias J. Sax 
> > wrote:
> >
> >> What do you exactly observe?
> >>
> >>  - Does the application rebalance correctly?
> >>  - Does it start processing?
> >>  - Anything in the logs about the status of the application?
> >>
> >>
> >> -Matthias
> >>
> >> On 6/10/19 6:19 PM, Brian Putt wrote:
> >>> Hello,
> >>>
> >>> I'm working with the kafka streams api and am running into issues
> where I
> >>> subscribe to multiple topics and the consumer just hangs. It has a
> unique
> >>> application.id and I can see in kafka that the consumer group has been
> >>> created, but when I describe the group, I'll get: consumer group X has
> no
> >>> active members
> >>>
> >>> The interesting thing is that this works when the topics list only
> >> contains
> >>> 1 topic. I'm not interested in other answers where we create multiple
> >>> sources, ie: source1 = builder.stream("topic1") and source2 =
> >>> builder.stream("topic2") as the interface for StreamsBuilder.stream
> >> supports
> >>> an array of topics.
> >>>
> >>> I've been able to subscribe to multiple topics before, I just can't
> >>> replicate how we've done this. (This code is running in a different
> >>> environment and working as expected, so not sure if it's a timing issue
> >> or
> >>> something else)
> >>>
> >>> List topics = Arrays.asList("topic1", "topic2");
> >>>
> >>> StreamsBuilder builder = new StreamsBuilder();
> >>> KStream source = builder.stream(topics);
> >>>
> >>> source
> >>> .transformValues(...)
> >>> .map(key, value) -> ...)
> >>> .to((key, value, record) -> ...);
> >>>
> >>> new KafkaStreams(builder.build(), props).start();
> >>>
> >>> This question has been posted on stackoverflow in case you want to
> >>> answer there:
> >>
> https://stackoverflow.com/questions/56535113/kafka-streams-listening-to-multiple-topics-hangs
> >>>
> >>
> >>
> >
>
>


Re: Kafka streams issue

2019-06-12 Thread Matthias J. Sax
Does the application transit to RUNNING state? Also check
`KafkaStreams#localThreadsMetadata()` what tasks are assigned?

You might also enable DEBUG logs for
`org.apache.kafka.clients.consumer.**` classes to see if the consumer
sends fetch request to the broker.


-Matthias

On 6/11/19 7:03 PM, Brian Putt wrote:
> The application just hangs (we let it sit for ~1 hour, small dataset as
> we're testing), we can restart it listening to 1 of the 3 topics we start
> it with and it chugs along, no problem. The same code is executed as
> separate application.ids listening to other topics without any issues.
> We'll try to increase our logging as nothing is currently being shown in
> the logs. Guessing we have our level set to WARN.
> 
> Will certainly share updates as we figure it out.
> 
> Thanks!
> 
> On Tue, Jun 11, 2019 at 6:07 PM Matthias J. Sax 
> wrote:
> 
>> What do you exactly observe?
>>
>>  - Does the application rebalance correctly?
>>  - Does it start processing?
>>  - Anything in the logs about the status of the application?
>>
>>
>> -Matthias
>>
>> On 6/10/19 6:19 PM, Brian Putt wrote:
>>> Hello,
>>>
>>> I'm working with the kafka streams api and am running into issues where I
>>> subscribe to multiple topics and the consumer just hangs. It has a unique
>>> application.id and I can see in kafka that the consumer group has been
>>> created, but when I describe the group, I'll get: consumer group X has no
>>> active members
>>>
>>> The interesting thing is that this works when the topics list only
>> contains
>>> 1 topic. I'm not interested in other answers where we create multiple
>>> sources, ie: source1 = builder.stream("topic1") and source2 =
>>> builder.stream("topic2") as the interface for StreamsBuilder.stream
>> supports
>>> an array of topics.
>>>
>>> I've been able to subscribe to multiple topics before, I just can't
>>> replicate how we've done this. (This code is running in a different
>>> environment and working as expected, so not sure if it's a timing issue
>> or
>>> something else)
>>>
>>> List topics = Arrays.asList("topic1", "topic2");
>>>
>>> StreamsBuilder builder = new StreamsBuilder();
>>> KStream source = builder.stream(topics);
>>>
>>> source
>>> .transformValues(...)
>>> .map(key, value) -> ...)
>>> .to((key, value, record) -> ...);
>>>
>>> new KafkaStreams(builder.build(), props).start();
>>>
>>> This question has been posted on stackoverflow in case you want to
>>> answer there:
>> https://stackoverflow.com/questions/56535113/kafka-streams-listening-to-multiple-topics-hangs
>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: Kafka streams issue

2019-06-11 Thread Brian Putt
The application just hangs (we let it sit for ~1 hour, small dataset as
we're testing), we can restart it listening to 1 of the 3 topics we start
it with and it chugs along, no problem. The same code is executed as
separate application.ids listening to other topics without any issues.
We'll try to increase our logging as nothing is currently being shown in
the logs. Guessing we have our level set to WARN.

Will certainly share updates as we figure it out.

Thanks!

On Tue, Jun 11, 2019 at 6:07 PM Matthias J. Sax 
wrote:

> What do you exactly observe?
>
>  - Does the application rebalance correctly?
>  - Does it start processing?
>  - Anything in the logs about the status of the application?
>
>
> -Matthias
>
> On 6/10/19 6:19 PM, Brian Putt wrote:
> > Hello,
> >
> > I'm working with the kafka streams api and am running into issues where I
> > subscribe to multiple topics and the consumer just hangs. It has a unique
> > application.id and I can see in kafka that the consumer group has been
> > created, but when I describe the group, I'll get: consumer group X has no
> > active members
> >
> > The interesting thing is that this works when the topics list only
> contains
> > 1 topic. I'm not interested in other answers where we create multiple
> > sources, ie: source1 = builder.stream("topic1") and source2 =
> > builder.stream("topic2") as the interface for StreamsBuilder.stream
> supports
> > an array of topics.
> >
> > I've been able to subscribe to multiple topics before, I just can't
> > replicate how we've done this. (This code is running in a different
> > environment and working as expected, so not sure if it's a timing issue
> or
> > something else)
> >
> > List topics = Arrays.asList("topic1", "topic2");
> >
> > StreamsBuilder builder = new StreamsBuilder();
> > KStream source = builder.stream(topics);
> >
> > source
> > .transformValues(...)
> > .map(key, value) -> ...)
> > .to((key, value, record) -> ...);
> >
> > new KafkaStreams(builder.build(), props).start();
> >
> > This question has been posted on stackoverflow in case you want to
> > answer there:
> https://stackoverflow.com/questions/56535113/kafka-streams-listening-to-multiple-topics-hangs
> >
>
>


Re: Kafka streams issue

2019-06-11 Thread Matthias J. Sax
What do you exactly observe?

 - Does the application rebalance correctly?
 - Does it start processing?
 - Anything in the logs about the status of the application?


-Matthias

On 6/10/19 6:19 PM, Brian Putt wrote:
> Hello,
> 
> I'm working with the kafka streams api and am running into issues where I
> subscribe to multiple topics and the consumer just hangs. It has a unique
> application.id and I can see in kafka that the consumer group has been
> created, but when I describe the group, I'll get: consumer group X has no
> active members
> 
> The interesting thing is that this works when the topics list only contains
> 1 topic. I'm not interested in other answers where we create multiple
> sources, ie: source1 = builder.stream("topic1") and source2 =
> builder.stream("topic2") as the interface for StreamsBuilder.stream supports
> an array of topics.
> 
> I've been able to subscribe to multiple topics before, I just can't
> replicate how we've done this. (This code is running in a different
> environment and working as expected, so not sure if it's a timing issue or
> something else)
> 
> List topics = Arrays.asList("topic1", "topic2");
> 
> StreamsBuilder builder = new StreamsBuilder();
> KStream source = builder.stream(topics);
> 
> source
> .transformValues(...)
> .map(key, value) -> ...)
> .to((key, value, record) -> ...);
> 
> new KafkaStreams(builder.build(), props).start();
> 
> This question has been posted on stackoverflow in case you want to
> answer there: 
> https://stackoverflow.com/questions/56535113/kafka-streams-listening-to-multiple-topics-hangs
> 



signature.asc
Description: OpenPGP digital signature


Re: Kafka Streams Library for Go

2019-05-22 Thread Alex Woolford
Hi Jay,

I'm not aware of a Go implementation of Kafka Streams in Apache Kafka. If
anyone here knows something I don't, feel free to correct me.

goka (https://github.com/lovoo/goka), while not part of Apache Kafka, seems
to be pretty popular. You may also want to check out KSQL which, although
it's not written in Go, may allow you to implement your streaming use
case(s) without writing a ton of Java.

Cheers,

Alex

On Wed, May 22, 2019 at 8:40 PM Jay Heydt  wrote:

> Do you have an estimate of when there will be a Kafka Streams library for
> Go?
>
> Thank you!
> Jay
>


Re: Kafka Streams Join does not work

2019-03-15 Thread Ryanne Dolan
If your producer is using transactions, it's possible that the producer was
killed in the middle of a transaction, in which case any un-committed
records would be logged on the broker but skipped by downstream consumers.

Otherwise, it's likely that the leader for the partition crashed before the
record was replicated to the 2 other brokers.

In either case, downstream consumers would ignore the record as if it never
existed.

re why the join didn't work: the consumers processing the partition would
ignore the uncommitted record, and Streams would never see it. This is by
design.

Ryanne

On Fri, Mar 15, 2019 at 10:36 AM Федор Чернилин 
wrote:

> Thanks, I understand how consumer works. The main question is related to
> why the join did not work and how it happened that only one message
> remained uncommitted.
>
> пт, 15 мар. 2019 г. в 16:29, Ryanne Dolan :
>
> > Hello! When using exactly-once semantics, uncommitted or aborted records
> > are skipped by the consumer as if they don't exist.
> >
> > When inspecting the topic manually, use isolation.level=read_committed to
> > get the same behavior.
> >
> > Ryanne
> >
> > On Fri, Mar 15, 2019, 6:08 AM Федор Чернилин 
> > wrote:
> >
> > > I also noticed another important thing now. Message which used for join
> > is
> > > uncommitted. I understood it with the help of consumer's setting
> > isolation
> > > level - read_committed. The message got into the topic using the same
> > > stream app. Remind that stream app has processing guarantee
> > > = exactly_once. How could this happen?
> > >
> >
>


Re: Kafka Streams Join does not work

2019-03-15 Thread Федор Чернилин
Thanks, I understand how consumer works. The main question is related to
why the join did not work and how it happened that only one message
remained uncommitted.

пт, 15 мар. 2019 г. в 16:29, Ryanne Dolan :

> Hello! When using exactly-once semantics, uncommitted or aborted records
> are skipped by the consumer as if they don't exist.
>
> When inspecting the topic manually, use isolation.level=read_committed to
> get the same behavior.
>
> Ryanne
>
> On Fri, Mar 15, 2019, 6:08 AM Федор Чернилин 
> wrote:
>
> > I also noticed another important thing now. Message which used for join
> is
> > uncommitted. I understood it with the help of consumer's setting
> isolation
> > level - read_committed. The message got into the topic using the same
> > stream app. Remind that stream app has processing guarantee
> > = exactly_once. How could this happen?
> >
>


Re: Kafka Streams Join does not work

2019-03-15 Thread Ryanne Dolan
Hello! When using exactly-once semantics, uncommitted or aborted records
are skipped by the consumer as if they don't exist.

When inspecting the topic manually, use isolation.level=read_committed to
get the same behavior.

Ryanne

On Fri, Mar 15, 2019, 6:08 AM Федор Чернилин 
wrote:

> I also noticed another important thing now. Message which used for join is
> uncommitted. I understood it with the help of consumer's setting isolation
> level - read_committed. The message got into the topic using the same
> stream app. Remind that stream app has processing guarantee
> = exactly_once. How could this happen?
>


Re: Kafka Streams Join does not work

2019-03-15 Thread Федор Чернилин
I also noticed another important thing now. Message which used for join is
uncommitted. I understood it with the help of consumer's setting isolation
level - read_committed. The message got into the topic using the same
stream app. Remind that stream app has processing guarantee
= exactly_once. How could this happen?


Re: Kafka Streams new release

2018-07-10 Thread Ofir Manor
>From the release plan:
  " While the target release date is fixed at ~2w after code freeze, RCs
will roll out as needed until the release vote passes"
   https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=80448820
You can follow the voting threads on this mailing list - the current thread
is "[VOTE] 2.0.0 RC2"
Once a vote for RC passes, that RC will be released as the 2.0.0 version.

Ofir Manor

Co-Founder & CTO | Equalum

Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io

On Wed, Jul 11, 2018 at 9:21 AM, Ayushi Sharma  wrote:

> When is the next Kafka Streams release (Kafka Streams 2.0.0) which was
> tentatively on 26June?
>


Re: Kafka Streams debugging with "no fluent" API choice

2017-08-01 Thread Damian Guy
Yeah, right. Sorry i missed that. The JIRA doesn't need a KIP

On Tue, 1 Aug 2017 at 15:20 Paolo Patierno  wrote:

> Hi Damian,
>
>
> changing the print() method for sure needs a KIP but I guess there is some
> reason we don't know why they decided to not have a fluent API for that.
>
> Regarding my JIRA I don't think a KIP is required, it's just internal
> stuff ... no ?
>
>
> Thanks
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>
>
> ________
> From: Damian Guy 
> Sent: Tuesday, August 1, 2017 2:11 PM
> To: dev@kafka.apache.org
> Subject: Re: Kafka Streams debugging with "no fluent" API choice
>
> Hi Paolo,
>
> The change would require a KIP as it is a public API change.  I don't see
> any harm in making the change, but I also don't think it is that difficult
> to use peek to achieve the same thing.
>
> Thanks,
> Damian
>
> On Tue, 1 Aug 2017 at 13:52 Paolo Patierno  wrote:
>
> > Thanks Damian,
> >
> >
> > I knew about that but you have to write the code for printing by
> yourself.
> > Of course you can do that even with the print() without using the default
> > keyvalue mapper but passing a custom one.
> >
> > At same time if you want to print you should use a Serdes for key and
> > value if they are bytes and it's something which happens for free with
> > print() passing Serdes instances.
> >
> >
> > Another point is ...
> >
> > as both foreach() and peek() methods relies on the KStreamPeek node, it
> > could be the same for the print() method which uses a KStreamPrint that
> is
> > a special KStreamPeek with forwardDownStream = false and providing the
> > usage of Serdes. For this I have opened the following JIRA :
> >
> >
> > https://issues.apache.org/jira/browse/KAFKA-5684
> >
> >
> > What do you think ?
> >
> >
> > Thanks
> >
> >
> > Paolo Patierno
> > Senior Software Engineer (IoT) @ Red Hat
> > Microsoft MVP on Windows Embedded & IoT
> > Microsoft Azure Advisor
> >
> > Twitter : @ppatierno<http://twitter.com/ppatierno>
> > Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> > Blog : DevExperience<http://paolopatierno.wordpress.com/>
> >
> >
> > 
> > From: Damian Guy 
> > Sent: Tuesday, August 1, 2017 12:11 PM
> > To: dev@kafka.apache.org
> > Subject: Re: Kafka Streams debugging with "no fluent" API choice
> >
> > I don't know specifically why this is removed, however if you want to get
> > the same functionality you can use peek, i.e:
> >
> > stream.map(...).peek(...).filter(..)
> >
> > You can log the key values out in the peek call.
> >
> > On Tue, 1 Aug 2017 at 11:48 Paolo Patierno  wrote:
> >
> > > Hi guys,
> > >
> > >
> > > I was thinking about Kafka Streams debug features and why the print()
> > > method overloads didn't return a KStream, in order to have a fluent DSL
> > > construction even during debugging, but just void.
> > >
> > > Then I came across this PR :
> > >
> > >
> > > https://github.com/apache/kafka/pull/1187
> > >
> > >
> > > May I ask why the form :
> > >
> > >
> > > stream1 = source.map(...).filter(...);
> > > stream1.print();   // this is for debugging, deleted before moving to
> > > productiong
> > >
> > > stream2 = stream1.transform(...);
> > > stream2.print();   // this is for debugging, deleted before moving to
> > > productiong
> > >
> > > was considered better then the fluent one :
> > >
> > >
> > > source.map(...).filter(...)
> > >   .print()   // this is for debugging, deleted before moving to
> > > productiong
> > >   .transform(...)
> > >   .print();   // this is for debugging, deleted before moving
> to
> > > productiong
> > >
> > >
> > > In this first case the user has to break the topology for printing and
> > > after that, when the code works fine, he has to rewrite the code for
> > > avoiding these broken processors having a fluent construction.
> > >
> > > In the second solution, the user has just to remove the print() calls
> > > without touching the other parts of the code.
> > >
> > > I really liked the first solution (it was something I was going to
> > propose
> > > before coming across the PR :-)).
> > >
> > >
> > > Why this preference on the first one ?
> > >
> > >
> > > Thanks
> > >
> > >
> > >
> > > Paolo Patierno
> > > Senior Software Engineer (IoT) @ Red Hat
> > > Microsoft MVP on Windows Embedded & IoT
> > > Microsoft Azure Advisor
> > >
> > > Twitter : @ppatierno<http://twitter.com/ppatierno>
> > > Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> > > Blog : DevExperience<http://paolopatierno.wordpress.com/>
> > >
> >
>


Re: Kafka Streams debugging with "no fluent" API choice

2017-08-01 Thread Paolo Patierno
Hi Damian,


changing the print() method for sure needs a KIP but I guess there is some 
reason we don't know why they decided to not have a fluent API for that.

Regarding my JIRA I don't think a KIP is required, it's just internal stuff ... 
no ?


Thanks


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>



From: Damian Guy 
Sent: Tuesday, August 1, 2017 2:11 PM
To: dev@kafka.apache.org
Subject: Re: Kafka Streams debugging with "no fluent" API choice

Hi Paolo,

The change would require a KIP as it is a public API change.  I don't see
any harm in making the change, but I also don't think it is that difficult
to use peek to achieve the same thing.

Thanks,
Damian

On Tue, 1 Aug 2017 at 13:52 Paolo Patierno  wrote:

> Thanks Damian,
>
>
> I knew about that but you have to write the code for printing by yourself.
> Of course you can do that even with the print() without using the default
> keyvalue mapper but passing a custom one.
>
> At same time if you want to print you should use a Serdes for key and
> value if they are bytes and it's something which happens for free with
> print() passing Serdes instances.
>
>
> Another point is ...
>
> as both foreach() and peek() methods relies on the KStreamPeek node, it
> could be the same for the print() method which uses a KStreamPrint that is
> a special KStreamPeek with forwardDownStream = false and providing the
> usage of Serdes. For this I have opened the following JIRA :
>
>
> https://issues.apache.org/jira/browse/KAFKA-5684
>
>
> What do you think ?
>
>
> Thanks
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>
>
> 
> From: Damian Guy 
> Sent: Tuesday, August 1, 2017 12:11 PM
> To: dev@kafka.apache.org
> Subject: Re: Kafka Streams debugging with "no fluent" API choice
>
> I don't know specifically why this is removed, however if you want to get
> the same functionality you can use peek, i.e:
>
> stream.map(...).peek(...).filter(..)
>
> You can log the key values out in the peek call.
>
> On Tue, 1 Aug 2017 at 11:48 Paolo Patierno  wrote:
>
> > Hi guys,
> >
> >
> > I was thinking about Kafka Streams debug features and why the print()
> > method overloads didn't return a KStream, in order to have a fluent DSL
> > construction even during debugging, but just void.
> >
> > Then I came across this PR :
> >
> >
> > https://github.com/apache/kafka/pull/1187
> >
> >
> > May I ask why the form :
> >
> >
> > stream1 = source.map(...).filter(...);
> > stream1.print();   // this is for debugging, deleted before moving to
> > productiong
> >
> > stream2 = stream1.transform(...);
> > stream2.print();   // this is for debugging, deleted before moving to
> > productiong
> >
> > was considered better then the fluent one :
> >
> >
> > source.map(...).filter(...)
> >   .print()   // this is for debugging, deleted before moving to
> > productiong
> >   .transform(...)
> >   .print();   // this is for debugging, deleted before moving to
> > productiong
> >
> >
> > In this first case the user has to break the topology for printing and
> > after that, when the code works fine, he has to rewrite the code for
> > avoiding these broken processors having a fluent construction.
> >
> > In the second solution, the user has just to remove the print() calls
> > without touching the other parts of the code.
> >
> > I really liked the first solution (it was something I was going to
> propose
> > before coming across the PR :-)).
> >
> >
> > Why this preference on the first one ?
> >
> >
> > Thanks
> >
> >
> >
> > Paolo Patierno
> > Senior Software Engineer (IoT) @ Red Hat
> > Microsoft MVP on Windows Embedded & IoT
> > Microsoft Azure Advisor
> >
> > Twitter : @ppatierno<http://twitter.com/ppatierno>
> > Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> > Blog : DevExperience<http://paolopatierno.wordpress.com/>
> >
>


Re: Kafka Streams debugging with "no fluent" API choice

2017-08-01 Thread Damian Guy
Hi Paolo,

The change would require a KIP as it is a public API change.  I don't see
any harm in making the change, but I also don't think it is that difficult
to use peek to achieve the same thing.

Thanks,
Damian

On Tue, 1 Aug 2017 at 13:52 Paolo Patierno  wrote:

> Thanks Damian,
>
>
> I knew about that but you have to write the code for printing by yourself.
> Of course you can do that even with the print() without using the default
> keyvalue mapper but passing a custom one.
>
> At same time if you want to print you should use a Serdes for key and
> value if they are bytes and it's something which happens for free with
> print() passing Serdes instances.
>
>
> Another point is ...
>
> as both foreach() and peek() methods relies on the KStreamPeek node, it
> could be the same for the print() method which uses a KStreamPrint that is
> a special KStreamPeek with forwardDownStream = false and providing the
> usage of Serdes. For this I have opened the following JIRA :
>
>
> https://issues.apache.org/jira/browse/KAFKA-5684
>
>
> What do you think ?
>
>
> Thanks
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>
>
> 
> From: Damian Guy 
> Sent: Tuesday, August 1, 2017 12:11 PM
> To: dev@kafka.apache.org
> Subject: Re: Kafka Streams debugging with "no fluent" API choice
>
> I don't know specifically why this is removed, however if you want to get
> the same functionality you can use peek, i.e:
>
> stream.map(...).peek(...).filter(..)
>
> You can log the key values out in the peek call.
>
> On Tue, 1 Aug 2017 at 11:48 Paolo Patierno  wrote:
>
> > Hi guys,
> >
> >
> > I was thinking about Kafka Streams debug features and why the print()
> > method overloads didn't return a KStream, in order to have a fluent DSL
> > construction even during debugging, but just void.
> >
> > Then I came across this PR :
> >
> >
> > https://github.com/apache/kafka/pull/1187
> >
> >
> > May I ask why the form :
> >
> >
> > stream1 = source.map(...).filter(...);
> > stream1.print();   // this is for debugging, deleted before moving to
> > productiong
> >
> > stream2 = stream1.transform(...);
> > stream2.print();   // this is for debugging, deleted before moving to
> > productiong
> >
> > was considered better then the fluent one :
> >
> >
> > source.map(...).filter(...)
> >   .print()   // this is for debugging, deleted before moving to
> > productiong
> >   .transform(...)
> >   .print();   // this is for debugging, deleted before moving to
> > productiong
> >
> >
> > In this first case the user has to break the topology for printing and
> > after that, when the code works fine, he has to rewrite the code for
> > avoiding these broken processors having a fluent construction.
> >
> > In the second solution, the user has just to remove the print() calls
> > without touching the other parts of the code.
> >
> > I really liked the first solution (it was something I was going to
> propose
> > before coming across the PR :-)).
> >
> >
> > Why this preference on the first one ?
> >
> >
> > Thanks
> >
> >
> >
> > Paolo Patierno
> > Senior Software Engineer (IoT) @ Red Hat
> > Microsoft MVP on Windows Embedded & IoT
> > Microsoft Azure Advisor
> >
> > Twitter : @ppatierno<http://twitter.com/ppatierno>
> > Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> > Blog : DevExperience<http://paolopatierno.wordpress.com/>
> >
>


Re: Kafka Streams debugging with "no fluent" API choice

2017-08-01 Thread Paolo Patierno
Thanks Damian,


I knew about that but you have to write the code for printing by yourself. Of 
course you can do that even with the print() without using the default keyvalue 
mapper but passing a custom one.

At same time if you want to print you should use a Serdes for key and value if 
they are bytes and it's something which happens for free with print() passing 
Serdes instances.


Another point is ...

as both foreach() and peek() methods relies on the KStreamPeek node, it could 
be the same for the print() method which uses a KStreamPrint that is a special 
KStreamPeek with forwardDownStream = false and providing the usage of Serdes. 
For this I have opened the following JIRA :


https://issues.apache.org/jira/browse/KAFKA-5684


What do you think ?


Thanks


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>



From: Damian Guy 
Sent: Tuesday, August 1, 2017 12:11 PM
To: dev@kafka.apache.org
Subject: Re: Kafka Streams debugging with "no fluent" API choice

I don't know specifically why this is removed, however if you want to get
the same functionality you can use peek, i.e:

stream.map(...).peek(...).filter(..)

You can log the key values out in the peek call.

On Tue, 1 Aug 2017 at 11:48 Paolo Patierno  wrote:

> Hi guys,
>
>
> I was thinking about Kafka Streams debug features and why the print()
> method overloads didn't return a KStream, in order to have a fluent DSL
> construction even during debugging, but just void.
>
> Then I came across this PR :
>
>
> https://github.com/apache/kafka/pull/1187
>
>
> May I ask why the form :
>
>
> stream1 = source.map(...).filter(...);
> stream1.print();   // this is for debugging, deleted before moving to
> productiong
>
> stream2 = stream1.transform(...);
> stream2.print();   // this is for debugging, deleted before moving to
> productiong
>
> was considered better then the fluent one :
>
>
> source.map(...).filter(...)
>   .print()   // this is for debugging, deleted before moving to
> productiong
>   .transform(...)
>   .print();   // this is for debugging, deleted before moving to
> productiong
>
>
> In this first case the user has to break the topology for printing and
> after that, when the code works fine, he has to rewrite the code for
> avoiding these broken processors having a fluent construction.
>
> In the second solution, the user has just to remove the print() calls
> without touching the other parts of the code.
>
> I really liked the first solution (it was something I was going to propose
> before coming across the PR :-)).
>
>
> Why this preference on the first one ?
>
>
> Thanks
>
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>


Re: Kafka Streams debugging with "no fluent" API choice

2017-08-01 Thread Damian Guy
I don't know specifically why this is removed, however if you want to get
the same functionality you can use peek, i.e:

stream.map(...).peek(...).filter(..)

You can log the key values out in the peek call.

On Tue, 1 Aug 2017 at 11:48 Paolo Patierno  wrote:

> Hi guys,
>
>
> I was thinking about Kafka Streams debug features and why the print()
> method overloads didn't return a KStream, in order to have a fluent DSL
> construction even during debugging, but just void.
>
> Then I came across this PR :
>
>
> https://github.com/apache/kafka/pull/1187
>
>
> May I ask why the form :
>
>
> stream1 = source.map(...).filter(...);
> stream1.print();   // this is for debugging, deleted before moving to
> productiong
>
> stream2 = stream1.transform(...);
> stream2.print();   // this is for debugging, deleted before moving to
> productiong
>
> was considered better then the fluent one :
>
>
> source.map(...).filter(...)
>   .print()   // this is for debugging, deleted before moving to
> productiong
>   .transform(...)
>   .print();   // this is for debugging, deleted before moving to
> productiong
>
>
> In this first case the user has to break the topology for printing and
> after that, when the code works fine, he has to rewrite the code for
> avoiding these broken processors having a fluent construction.
>
> In the second solution, the user has just to remove the print() calls
> without touching the other parts of the code.
>
> I really liked the first solution (it was something I was going to propose
> before coming across the PR :-)).
>
>
> Why this preference on the first one ?
>
>
> Thanks
>
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno
> Linkedin : paolopatierno
> Blog : DevExperience
>


Re: Kafka streams KStream and ktable join issue

2017-06-24 Thread Matthias J. Sax
I thought, we drop records with null key? No?

-Matthias

On 6/23/17 12:25 AM, Damian Guy wrote:
> My guess is it is because the record doesn't have a key, i.e., the key is
> null. We have a fix for this in 0.11, in that we will skip records with a
> null key during restore.
> 
> On Fri, 23 Jun 2017 at 03:57 Matthias J. Sax  wrote:
> 
>> Hi,
>>
>> can you reproduce the error reliably? Are use using 0.10.2.0 or 0.10.2.1?
>>
>> It's unclear to me, how an NPE can occur. It seems to happen within
>> Streams library. Might be a bug. Not sure atm.
>>
>>
>> -Matthias
>>
>> On 6/22/17 9:43 AM, Shekar Tippur wrote:
>>> Hello,
>>>
>>> I am trying to perform a simple join operation. I am using Kafka 0.10.2
>>>
>>> I have a "raw" table and a "cache" topics and just 1 partition in my
>> local
>>> environment.
>>>
>>> ktable has these entries
>>>
>>> {"Joe": {"location": "US", "gender": "male"}}
>>> {"Julie": {"location": "US", "gender": "female"}}
>>> {"Kawasaki": {"location": "Japan", "gender": "male"}}
>>>
>>> The kstream gets a event
>>>
>>> {"user": "Joe", "custom": {"choice":"vegan"}}
>>>
>>> I want a output as a join
>>>
>>> {"user": "Joe", "custom": {"choice":"vegan","enriched":*{"location":
>> "US",
>>> "gender": "male"}*} }
>>>
>>> I want to take whats in ktable and add to enriched section of the output
>>> stream.
>>>
>>> I have defined serde
>>>
>>> //This is the same serde code from the example.
>>>
>>> final TestStreamsSerializer jsonSerializer = new
>>> TestStreamsSerializer();
>>> final TestStreamsDeserialzer jsonDeserializer = new
>>> TestStreamsDeserialzer();
>>> final Serde jsonSerde = Serdes.serdeFrom(jsonSerializer,
>>> jsonDeserializer);
>>>
>>> //
>>>
>>> KStream raw = builder.stream(Serdes.String(),
>>> jsonSerde, "raw");
>>> KTable  cache = builder.table("cache", "local-cache");
>>>
>>> raw.leftJoin(cache,
>>> (record1, record2) -> record1.get("user") + "-" +
>> record2).to("output");
>>>
>>> I am having trouble understanding how to call the join api.
>>>
>>> With the above code, I seem to get a error:
>>>
>>> [2017-06-22 09:23:31,836] ERROR User provided listener
>>> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
>>> streams-pipe failed on partition assignment
>>> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
>>>
>>> java.lang.NullPointerException
>>>
>>> at org.rocksdb.RocksDB.put(RocksDB.java:488)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
>>>
>>> at
>>>
>> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
>>>
>>> at
>>>
>> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
>>>
>>> at
>>>
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordin

Re: Kafka streams KStream and ktable join issue

2017-06-23 Thread Damian Guy
My guess is it is because the record doesn't have a key, i.e., the key is
null. We have a fix for this in 0.11, in that we will skip records with a
null key during restore.

On Fri, 23 Jun 2017 at 03:57 Matthias J. Sax  wrote:

> Hi,
>
> can you reproduce the error reliably? Are use using 0.10.2.0 or 0.10.2.1?
>
> It's unclear to me, how an NPE can occur. It seems to happen within
> Streams library. Might be a bug. Not sure atm.
>
>
> -Matthias
>
> On 6/22/17 9:43 AM, Shekar Tippur wrote:
> > Hello,
> >
> > I am trying to perform a simple join operation. I am using Kafka 0.10.2
> >
> > I have a "raw" table and a "cache" topics and just 1 partition in my
> local
> > environment.
> >
> > ktable has these entries
> >
> > {"Joe": {"location": "US", "gender": "male"}}
> > {"Julie": {"location": "US", "gender": "female"}}
> > {"Kawasaki": {"location": "Japan", "gender": "male"}}
> >
> > The kstream gets a event
> >
> > {"user": "Joe", "custom": {"choice":"vegan"}}
> >
> > I want a output as a join
> >
> > {"user": "Joe", "custom": {"choice":"vegan","enriched":*{"location":
> "US",
> > "gender": "male"}*} }
> >
> > I want to take whats in ktable and add to enriched section of the output
> > stream.
> >
> > I have defined serde
> >
> > //This is the same serde code from the example.
> >
> > final TestStreamsSerializer jsonSerializer = new
> > TestStreamsSerializer();
> > final TestStreamsDeserialzer jsonDeserializer = new
> > TestStreamsDeserialzer();
> > final Serde jsonSerde = Serdes.serdeFrom(jsonSerializer,
> > jsonDeserializer);
> >
> > //
> >
> > KStream raw = builder.stream(Serdes.String(),
> > jsonSerde, "raw");
> > KTable  cache = builder.table("cache", "local-cache");
> >
> > raw.leftJoin(cache,
> > (record1, record2) -> record1.get("user") + "-" +
> record2).to("output");
> >
> > I am having trouble understanding how to call the join api.
> >
> > With the above code, I seem to get a error:
> >
> > [2017-06-22 09:23:31,836] ERROR User provided listener
> > org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> > streams-pipe failed on partition assignment
> > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> >
> > java.lang.NullPointerException
> >
> > at org.rocksdb.RocksDB.put(RocksDB.java:488)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
> >
> > at
> >
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> >
> > at
> >
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> >
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> >
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> >
> > at
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> >
> > at
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> >
> > at
> >

Re: Kafka streams KStream and ktable join issue

2017-06-22 Thread Matthias J. Sax
Hi,

can you reproduce the error reliably? Are use using 0.10.2.0 or 0.10.2.1?

It's unclear to me, how an NPE can occur. It seems to happen within
Streams library. Might be a bug. Not sure atm.


-Matthias

On 6/22/17 9:43 AM, Shekar Tippur wrote:
> Hello,
> 
> I am trying to perform a simple join operation. I am using Kafka 0.10.2
> 
> I have a "raw" table and a "cache" topics and just 1 partition in my local
> environment.
> 
> ktable has these entries
> 
> {"Joe": {"location": "US", "gender": "male"}}
> {"Julie": {"location": "US", "gender": "female"}}
> {"Kawasaki": {"location": "Japan", "gender": "male"}}
> 
> The kstream gets a event
> 
> {"user": "Joe", "custom": {"choice":"vegan"}}
> 
> I want a output as a join
> 
> {"user": "Joe", "custom": {"choice":"vegan","enriched":*{"location": "US",
> "gender": "male"}*} }
> 
> I want to take whats in ktable and add to enriched section of the output
> stream.
> 
> I have defined serde
> 
> //This is the same serde code from the example.
> 
> final TestStreamsSerializer jsonSerializer = new
> TestStreamsSerializer();
> final TestStreamsDeserialzer jsonDeserializer = new
> TestStreamsDeserialzer();
> final Serde jsonSerde = Serdes.serdeFrom(jsonSerializer,
> jsonDeserializer);
> 
> //
> 
> KStream raw = builder.stream(Serdes.String(),
> jsonSerde, "raw");
> KTable  cache = builder.table("cache", "local-cache");
> 
> raw.leftJoin(cache,
> (record1, record2) -> record1.get("user") + "-" + 
> record2).to("output");
> 
> I am having trouble understanding how to call the join api.
> 
> With the above code, I seem to get a error:
> 
> [2017-06-22 09:23:31,836] ERROR User provided listener
> org.apache.kafka.streams.processor.internals.StreamThread$1 for group
> streams-pipe failed on partition assignment
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> 
> java.lang.NullPointerException
> 
> at org.rocksdb.RocksDB.put(RocksDB.java:488)
> 
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.putInternal(RocksDBStore.java:254)
> 
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.access$000(RocksDBStore.java:67)
> 
> at
> org.apache.kafka.streams.state.internals.RocksDBStore$1.restore(RocksDBStore.java:164)
> 
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:242)
> 
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:201)
> 
> at
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.register(AbstractProcessorContext.java:99)
> 
> at
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:160)
> 
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
> 
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
> 
> at
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:63)
> 
> at
> org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:141)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234)
> 
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259)
> 
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352)
> 
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
> 
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
> 
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
> 
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
> 
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)
> 
> [2017-06-22 09:23:31,849] WARN stream-thread [StreamThread-1] Unexpected
> state transition from ASSIGNING_PARTITIONS to NOT_RUNNING.
> (or

Re: Kafka Streams : parallelism related to repartition topic partitions as well

2017-06-20 Thread Matthias J. Sax
Your observation is correct.

The paragraph you quote is not very precise but also not necessarily
wrong. The example is simplified and assumes that there is no
re-partitioning even if it is not mentioned explicitly.


-Matthias


On 6/20/17 9:32 AM, Paolo Patierno wrote:
> Hi devs,
> 
> 
> at following documentation page (by Confluent) I read 
> (http://docs.confluent.io/current/streams/architecture.html#stream-partitions-and-tasks)
>  ...
> 
> 
> "the maximum parallelism at which your application may run is bounded by the 
> maximum number of stream tasks, which itself is determined by maximum number 
> of partitions of the input topic(s) the application is reading from. For 
> example, if your input topic has 5 partitions, then you can run up to 5 
> applications instances"
> 
> but it seems not so true ... I mean ...
> The number of the application instances depends on the possibility that we 
> have "internal" repartition topic in our processor topology.
> I tried the WordCountDemo starting from a topic with 2 partitions. In this 
> case I'm able to run up to 4 application instances while the 5th stays idle.
> It's possible because due to the map() in the example we have repartitioning 
> (so 1 repartition topic with 2 partitions) ... it means 4 tasks for the total 
> 4 partitions (2 for the input topic, 2 for the repartition topic) ... and 
> this tasks can run even one for each application instance.
> Following the above mentioned doc part the maximum should be just 2 (not 4).
> 
> Do you confirm this ?
> 
> Thanks,
> Paolo
> 
> 
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
> 
> Twitter : @ppatierno
> Linkedin : paolopatierno
> Blog : DevExperience
> 



signature.asc
Description: OpenPGP digital signature


Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-18 Thread Paolo Patierno
I'm just thinking that having output into a topic every X seconds thanks to the 
windowing could be a useful feature without using something interactive queries 
that are really powerful (I love them) but aren't so useful in this scenario.

Using the caching parameter isn't useful in such scenario because it's in terms 
of bytes not in terms of time.


Let's consider another scenario ...


I have a sensor sending data every 1 seconds. Let's assume that our stream 
processing application is not online and the source topic is filled by sensor 
data with related event time.

When the stream processing application comes online I'd like to have a record 
in the final topic every 5 seconds in order to have an history as well (because 
the application was offline). To be clear ...

Imagine that starting from t = 0, the sensor starts to send data but 
application is offline and the topic is filled from t = 0 to t = 12 (with 12 
events, one per second).

At t = 12 application comes back online and processes the stream in order to 
process data from t = 0 to t = 4 (so first 5 seconds) putting the result into 
the destination queue. Then from t = 5 to t = 9 (other 5 seconds) putting the 
result into the destination queue and so on. If sensor rate isn't so fast then 
the application will start to process in real time at some point (it seems to 
me something like a batch processing which becomes real time processing).

This scenario, for example, isn't possible with Spark today because when the 
application comes back online it process all data from t = 0 to t = 12 
immediately as they were a whole burst of data without considering t as event 
time to take into account for processing.


I'm thinking aloud, considering some scenario that could have a value in the 
IoT space ...


Thanks,

Paolo.


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>



From: Michal Borowiecki 
Sent: Sunday, June 18, 2017 9:34 AM
To: dev@kafka.apache.org; Jay Kreps
Cc: us...@kafka.apache.org; Matthias J. Sax
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window


If confusion is the problem, then totally agree no point adding more knobs. 
Perhaps you're right that users don't really want processing-time semantics. 
Just think they want them until they start considering replay/catch-up 
scenarios. I guess people rarely think about those from the start (I sure 
didn't).

Cheers,

Michał

On 16/06/17 17:54, Jay Kreps wrote:
I think the question is when do you actually want processing time semantics? 
There are definitely times when its safe to assume the two are close enough 
that a little lossiness doesn't matter much but it is pretty hard to make 
assumptions about when the processing time is and has been hard for us to think 
of a use case where its actually desirable.

I think mostly what we've seen is confusion about the core concepts:

  *   stream -- immutable events that occur
  *   tables (including windows) -- current state of the world

If the root problem is confusion adding knobs never makes it better. If the 
root problem is we're missing important use cases that justify the additional 
knobs then i think it's good to try to really understand them. I think there 
could be use cases around systems that don't take updates, example would be 
email, twitter, and some metrics stores.

One solution that would be less complexity inducing than allowing new 
semantics, but might help with the use cases we need to collect, would be to 
add a new operator in the DSL. Something like .freezeAfter(30, 
TimeUnit.SECONDS) that collects all updates for a given window and both emits 
and enforces a single output after 30 seconds after the advancement of stream 
time and remembers that it is omitted, suppressing all further output (so the 
output is actually a KStream). This might or might not depend on wall clock 
time. Perhaps this is in fact what you are proposing?

-Jay



On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki 
mailto:michal.borowie...@openbet.com>> wrote:

I wonder if it's a frequent enough use case that Kafka Streams should consider 
providing this out of the box - this was asked for multiple times, right?

Personally, I agree totally with the philosophy of "no final aggregation", as 
expressed by Eno's post, but IMO that is predicated totally on event-time 
semantics.

If users want processing-time semantics then, as the docs already point out, 
there is no such thing as a late-arriving record - every record just falls in 
the currently open window(s), hence the notion of final aggregation makes 
perfec

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-18 Thread Michal Borowiecki
"final" results for
windows for which "window end time" passed.


-Matthias

On 6/15/17 9:21 AM, Paolo Patierno wrote:

Hi Eno,


regarding closing window I think that it's up to the streaming application. 
I mean ...

If I want something like I described, I know that a value outside my 5 seconds window 
will be taken into account for the next processing (in the next 5 seconds). I don't think 
I'm losing a record, I am ware that this record will fall in the next 
"processing" window. Btw I'll take a look at your article ! Thanks !


Paolo


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno> 
<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>
<http://paolopatierno.wordpress.com/>



From: Eno Thereska <mailto:eno.there...@gmail.com>
Sent: Thursday, June 15, 2017 3:57 PM
To:us...@kafka.apache.org <mailto:us...@kafka.apache.org>
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window

Hi Paolo,

Yeah, so if you want fewer records, you should actually "not" disable 
cache. If you disable cache you'll get all the records as you described.

About closing windows: if you close a window and a late record arrives that 
should have been in that window, you basically lose the ability to process that 
record. In Kafka Streams we are robust to that, in that we handle late arriving 
records. There is a comparison here for example when we compare it to other 
methods that depend on watermarks or 
triggers:https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/
<https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>  
<https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
<https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>

Eno



On 15 Jun 2017, at 14:57, Paolo Patierno 
<mailto:ppatie...@live.com>  wrote:

Hi Emo,


thanks for the reply !

Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 
(so disabling cache).

Regarding the interactive query API (I'll take a look) it means that it's 
up to the application doing something like we have oob with Spark.

May I ask what do you mean with "We don’t believe in closing windows" ? 
Isn't it much more code that user has to write for having the same result ?

I'm exploring Kafka Streams and it's very powerful imho even because the 
usage is pretty simple but this scenario could have a lack against Spark.


Thanks,

Paolo.


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno> 
<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>
<http://paolopatierno.wordpress.com/>



From: Eno Thereska <mailto:eno.there...@gmail.com>
Sent: Thursday, June 15, 2017 1:45 PM
To:us...@kafka.apache.org <mailto:us...@kafka.apache.org>
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window

Hi Paolo,

That is indeed correct. We don’t believe in closing windows in Kafka 
Streams.
You could reduce the number of downstream records by using record 
caches:http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl

<http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>
  
<http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>

<http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.

Alternatively you can just query the KTable whenever you want using the 
Interactive Query APIs (so when you query dictates what  data you receive), see 
thishttps://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/

<https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
  
<https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>

<https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>

Thanks
Eno

On Jun 15, 2017, at 2:38 

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-16 Thread Matthias J. Sax
Thanks Michał!

That is very good feedback.


-Matthias

On 6/16/17 2:38 AM, Michal Borowiecki wrote:
> I wonder if it's a frequent enough use case that Kafka Streams should
> consider providing this out of the box - this was asked for multiple
> times, right?
> 
> Personally, I agree totally with the philosophy of "no final
> aggregation", as expressed by Eno's post, but IMO that is predicated
> totally on event-time semantics.
> 
> If users want processing-time semantics then, as the docs already point
> out, there is no such thing as a late-arriving record - every record
> just falls in the currently open window(s), hence the notion of final
> aggregation makes perfect sense, from the usability point of view.
> 
> The single abstraction of "stream time" proves leaky in some cases (e.g.
> for punctuate method - being addressed in KIP-138). Perhaps this is
> another case where processing-time semantics warrant explicit handling
> in the api - but of course, only if there's sufficient user demand for this.
> 
> What I could imagine is a new type of time window
> (ProcessingTimeWindow?), that if used in an aggregation, the underlying
> processor would force the WallclockTimestampExtractor (KAFKA-4144
> enables that) and would use the system-time punctuation (KIP-138) to
> send the final aggregation value once the window has expired and could
> be configured to not send intermediate updates while the window was open.
> 
> Of course this is just a helper for the users, since they can implement
> it all themselves using the low-level API, as Matthias pointed out
> already. Just seems there's recurring interest in this.
> 
> Again, this only makes sense for processing time semantics. For
> event-time semantics I find the arguments for "no final aggregation"
> totally convincing.
> 
> 
> Cheers,
> 
> Michał
> 
> 
> On 16/06/17 00:08, Matthias J. Sax wrote:
>> Hi Paolo,
>>
>> This SO question might help, too:
>> https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
>>
>> For Streams, the basic model is based on "change" and we report updates
>> to the "current" result immediately reducing latency to a minimum.
>>
>> Last, if you say it's going to fall into the next window, you won't get
>> event time semantics but you fall back processing time semantics, that
>> cannot provide exact results
>>
>> If you really want to trade-off correctness version getting (late)
>> updates and want to use processing time semantics, you should configure
>> WallclockTimestampExtractor and implement a "update deduplication"
>> operator using table.toStream().transform(). You can attached a state to
>> your transformer and store all update there (ie, newer update overwrite
>> older updates). Punctuations allow you to emit "final" results for
>> windows for which "window end time" passed.
>>
>>
>> -Matthias
>>
>> On 6/15/17 9:21 AM, Paolo Patierno wrote:
>>> Hi Eno,
>>>
>>>
>>> regarding closing window I think that it's up to the streaming application. 
>>> I mean ...
>>>
>>> If I want something like I described, I know that a value outside my 5 
>>> seconds window will be taken into account for the next processing (in the 
>>> next 5 seconds). I don't think I'm losing a record, I am ware that this 
>>> record will fall in the next "processing" window. Btw I'll take a look at 
>>> your article ! Thanks !
>>>
>>>
>>> Paolo
>>>
>>>
>>> Paolo Patierno
>>> Senior Software Engineer (IoT) @ Red Hat
>>> Microsoft MVP on Windows Embedded & IoT
>>> Microsoft Azure Advisor
>>>
>>> Twitter : @ppatierno<http://twitter.com/ppatierno>
>>> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
>>> Blog : DevExperience<http://paolopatierno.wordpress.com/>
>>>
>>>
>>> 
>>> From: Eno Thereska 
>>> Sent: Thursday, June 15, 2017 3:57 PM
>>> To: us...@kafka.apache.org
>>> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>>>
>>> Hi Paolo,
>>>
>>> Yeah, so if you want fewer records, you should actually "not" disable 
>>> cache. If you disable cache you'll get all the records as you described.
>>>
>>> About closing windows: if you close a window and a late rec

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-16 Thread Jay Kreps
> next 5 seconds). I don't think I'm losing a record, I am ware that this 
> record will fall in the next "processing" window. Btw I'll take a look at 
> your article ! Thanks !
>
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> 
> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> 
> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> 
> <http://paolopatierno.wordpress.com/>
>
>
> 
> From: Eno Thereska  
> Sent: Thursday, June 15, 2017 3:57 PM
> To: us...@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> Yeah, so if you want fewer records, you should actually "not" disable cache. 
> If you disable cache you'll get all the records as you described.
>
> About closing windows: if you close a window and a late record arrives that 
> should have been in that window, you basically lose the ability to process 
> that record. In Kafka Streams we are robust to that, in that we handle late 
> arriving records. There is a comparison here for example when we compare it 
> to other methods that depend on watermarks or triggers: 
> https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ 
> <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/> 
> <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
>
> Eno
>
>
>
> On 15 Jun 2017, at 14:57, Paolo Patierno  
>  wrote:
>
> Hi Emo,
>
>
> thanks for the reply !
>
> Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 
> (so disabling cache).
>
> Regarding the interactive query API (I'll take a look) it means that it's up 
> to the application doing something like we have oob with Spark.
>
> May I ask what do you mean with "We don’t believe in closing windows" ? Isn't 
> it much more code that user has to write for having the same result ?
>
> I'm exploring Kafka Streams and it's very powerful imho even because the 
> usage is pretty simple but this scenario could have a lack against Spark.
>
>
> Thanks,
>
> Paolo.
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> 
> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> 
> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> 
> <http://paolopatierno.wordpress.com/>
>
>
> 
> From: Eno Thereska  
> Sent: Thursday, June 15, 2017 1:45 PM
> To: us...@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> That is indeed correct. We don’t believe in closing windows in Kafka Streams.
> You could reduce the number of downstream records by using record caches: 
> http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl
>  
> <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>
>  
> <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
>
> Alternatively you can just query the KTable whenever you want using the 
> Interactive Query APIs (so when you query dictates what  data you receive), 
> see this 
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
>  
> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>  
> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>
> Thanks
> Eno
>
> On Jun 15, 2017, at 2:38 PM, Paolo Patierno  
>  wrote:
>
> Hi,
>
>
> using the streams library I noticed a difference (or there is a lack of 
> knowledge on my side)with Apache Spark.
>
> Imagine following scenario ...
>
>
> I have a source topic where numeric values come in and I want to check the 
> maximum value in the latest 5 seconds but ... putting the max value into a 
> destination topic every 5 seconds.
>
> This is what happens with reduceByWindow method in Spark.
>
> I'm using reduce on a KStream here that 

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-16 Thread Jay Kreps
> next 5 seconds). I don't think I'm losing a record, I am ware that this 
> record will fall in the next "processing" window. Btw I'll take a look at 
> your article ! Thanks !
>
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> 
> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> 
> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> 
> <http://paolopatierno.wordpress.com/>
>
>
> 
> From: Eno Thereska  
> Sent: Thursday, June 15, 2017 3:57 PM
> To: us...@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> Yeah, so if you want fewer records, you should actually "not" disable cache. 
> If you disable cache you'll get all the records as you described.
>
> About closing windows: if you close a window and a late record arrives that 
> should have been in that window, you basically lose the ability to process 
> that record. In Kafka Streams we are robust to that, in that we handle late 
> arriving records. There is a comparison here for example when we compare it 
> to other methods that depend on watermarks or triggers: 
> https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ 
> <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/> 
> <https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>
>
> Eno
>
>
>
> On 15 Jun 2017, at 14:57, Paolo Patierno  
>  wrote:
>
> Hi Emo,
>
>
> thanks for the reply !
>
> Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 
> (so disabling cache).
>
> Regarding the interactive query API (I'll take a look) it means that it's up 
> to the application doing something like we have oob with Spark.
>
> May I ask what do you mean with "We don’t believe in closing windows" ? Isn't 
> it much more code that user has to write for having the same result ?
>
> I'm exploring Kafka Streams and it's very powerful imho even because the 
> usage is pretty simple but this scenario could have a lack against Spark.
>
>
> Thanks,
>
> Paolo.
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : @ppatierno<http://twitter.com/ppatierno> 
> <http://twitter.com/ppatierno>
> Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno> 
> <http://it.linkedin.com/in/paolopatierno>
> Blog : DevExperience<http://paolopatierno.wordpress.com/> 
> <http://paolopatierno.wordpress.com/>
>
>
> 
> From: Eno Thereska  
> Sent: Thursday, June 15, 2017 1:45 PM
> To: us...@kafka.apache.org
> Subject: Re: Kafka Streams vs Spark Streaming : reduce by window
>
> Hi Paolo,
>
> That is indeed correct. We don’t believe in closing windows in Kafka Streams.
> You could reduce the number of downstream records by using record caches: 
> http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl
>  
> <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>
>  
> <http://docs.confluent.io/current/streams/developer-guide.html#record-caches-in-the-dsl>.
>
> Alternatively you can just query the KTable whenever you want using the 
> Interactive Query APIs (so when you query dictates what  data you receive), 
> see this 
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
>  
> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>  
> <https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
>
> Thanks
> Eno
>
> On Jun 15, 2017, at 2:38 PM, Paolo Patierno  
>  wrote:
>
> Hi,
>
>
> using the streams library I noticed a difference (or there is a lack of 
> knowledge on my side)with Apache Spark.
>
> Imagine following scenario ...
>
>
> I have a source topic where numeric values come in and I want to check the 
> maximum value in the latest 5 seconds but ... putting the max value into a 
> destination topic every 5 seconds.
>
> This is what happens with reduceByWindow method in Spark.
>
> I'm using reduce on a KStream here that 

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-16 Thread Michal Borowiecki
I wonder if it's a frequent enough use case that Kafka Streams should 
consider providing this out of the box - this was asked for multiple 
times, right?


Personally, I agree totally with the philosophy of "no final 
aggregation", as expressed by Eno's post, but IMO that is predicated 
totally on event-time semantics.


If users want processing-time semantics then, as the docs already point 
out, there is no such thing as a late-arriving record - every record 
just falls in the currently open window(s), hence the notion of final 
aggregation makes perfect sense, from the usability point of view.


The single abstraction of "stream time" proves leaky in some cases (e.g. 
for punctuate method - being addressed in KIP-138). Perhaps this is 
another case where processing-time semantics warrant explicit handling 
in the api - but of course, only if there's sufficient user demand for this.


What I could imagine is a new type of time window 
(ProcessingTimeWindow?), that if used in an aggregation, the underlying 
processor would force the WallclockTimestampExtractor (KAFKA-4144 
enables that) and would use the system-time punctuation (KIP-138) to 
send the final aggregation value once the window has expired and could 
be configured to not send intermediate updates while the window was open.


Of course this is just a helper for the users, since they can implement 
it all themselves using the low-level API, as Matthias pointed out 
already. Just seems there's recurring interest in this.


Again, this only makes sense for processing time semantics. For 
event-time semantics I find the arguments for "no final aggregation" 
totally convincing.



Cheers,

Michał


On 16/06/17 00:08, Matthias J. Sax wrote:

Hi Paolo,

This SO question might help, too:
https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable

For Streams, the basic model is based on "change" and we report updates
to the "current" result immediately reducing latency to a minimum.

Last, if you say it's going to fall into the next window, you won't get
event time semantics but you fall back processing time semantics, that
cannot provide exact results

If you really want to trade-off correctness version getting (late)
updates and want to use processing time semantics, you should configure
WallclockTimestampExtractor and implement a "update deduplication"
operator using table.toStream().transform(). You can attached a state to
your transformer and store all update there (ie, newer update overwrite
older updates). Punctuations allow you to emit "final" results for
windows for which "window end time" passed.


-Matthias

On 6/15/17 9:21 AM, Paolo Patierno wrote:

Hi Eno,


regarding closing window I think that it's up to the streaming application. I 
mean ...

If I want something like I described, I know that a value outside my 5 seconds window 
will be taken into account for the next processing (in the next 5 seconds). I don't think 
I'm losing a record, I am ware that this record will fall in the next 
"processing" window. Btw I'll take a look at your article ! Thanks !


Paolo


Paolo Patierno
Senior Software Engineer (IoT) @ Red Hat
Microsoft MVP on Windows Embedded & IoT
Microsoft Azure Advisor

Twitter : @ppatierno<http://twitter.com/ppatierno>
Linkedin : paolopatierno<http://it.linkedin.com/in/paolopatierno>
Blog : DevExperience<http://paolopatierno.wordpress.com/>



From: Eno Thereska 
Sent: Thursday, June 15, 2017 3:57 PM
To: us...@kafka.apache.org
Subject: Re: Kafka Streams vs Spark Streaming : reduce by window

Hi Paolo,

Yeah, so if you want fewer records, you should actually "not" disable cache. If 
you disable cache you'll get all the records as you described.

About closing windows: if you close a window and a late record arrives that should 
have been in that window, you basically lose the ability to process that record. In 
Kafka Streams we are robust to that, in that we handle late arriving records. There 
is a comparison here for example when we compare it to other methods that depend on 
watermarks or triggers: 
https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ 
<https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/>

Eno



On 15 Jun 2017, at 14:57, Paolo Patierno  wrote:

Hi Emo,


thanks for the reply !

Regarding the cache I'm already using CACHE_MAX_BYTES_BUFFERING_CONFIG = 0 (so 
disabling cache).

Regarding the interactive query API (I'll take a look) it means that it's up to 
the application doing something like we have oob with Spark.

May I ask what do you mean with "We don’t believe in closing windows" ? Isn't 
it much more code that user has to write for having the 

Re: Kafka-Streams: Cogroup

2017-05-01 Thread Guozhang Wang
Kyle,

What's your apache id? I can grant you the permission.


Guozhang


On Sat, Apr 29, 2017 at 7:33 AM, Kyle Winkelman 
wrote:

> I don't seem to have permission. When logged in I can neither edit the
> main page nor create an additional KIP.
>
> Thanks,
> Kyle
>
> On Thu, Apr 27, 2017 at 12:35 PM, Eno Thereska 
> wrote:
>
>> Hi Kyle,
>>
>> I believe Guozhang has now given you permission to edit the KIP wiki at
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+
>> Improvement+Proposals. Could you see if you can add this there?
>>
>> Many thanks
>> Eno
>>
>> On Wed, Apr 26, 2017 at 6:00 PM, Kyle Winkelman > > wrote:
>>
>>> Thank you for your reply.
>>>
>>> I have attached my first attempt at writing a KIP and I was wondering if
>>> you could review it and share your thoughts.
>>>
>>> Going forward I would like to create this KIP. I was wondering whom I
>>> should ask to get the necessary permissions on the wiki. Username:
>>> winkelman.kyle
>>>
>>>
>>>
>>> On Fri, Apr 21, 2017 at 3:15 PM, Eno Thereska 
>>> wrote:
>>>
 Hi Kyle,

 Sorry for the delay in replying. I think it's worth doing a KIP for
 this one. One super helpful thing with KIPs is to list a few more scenarios
 that would benefit from this approach. In particular it seems the main
 benefit is from reducing the number of state stores. Does this necessarily
 reduce the number of IOs to the stores (number of puts/gets), or the extra
 space overheads with multiple stores. Quantifying that a bit would help.

 To answer your original questions:

 >The problem I am having with this approach is understanding if there
 is a race condition. Obviously the source topics would be copartitioned.
 But would it be multithreaded and possibly cause one of the processors to
 grab patient 1 at the same time a different processor has grabbed patient 
 1?


 I don't think there will be a problem here. A processor cannot be
 accessed by multiple threads in Kafka Streams.


 >My understanding is that for each partition there would be a single
 complete set of processors and a new incoming record would go completely
 through the processor topology from a source node to a sink node before the
 next one is sent through. Is this correct?

 This is mostly true, however if caching is enabled (for dedupping, see
 KIP-63), then a record may reside in a cache before going to the sink.
 Meanwhile another record can come in. So multiple records can be in the
 topology at the same time.

 Thanks
 Eno





 On Fri, Apr 14, 2017 at 8:16 PM, Kyle Winkelman <
 winkelman.k...@gmail.com> wrote:

> Eno,
> Thanks for the response. The figure was just a restatement of my
> questions. I have made an attempt at a low level processor and it appears
> to work but it isn't very pretty and was hoping for something at the
> streams api level.
>
> I have written some code to show an example of how I see the Cogroup
> working in kafka.
>
> First the KGroupedStream would have a cogroup method that takes the
> initializer and the aggregator for that specific KGroupedStream. This 
> would
> return a KCogroupedStream that has 2 methods one to add more
> KGroupedStream, Aggregator pairs and one to complete the construction and
> return a KTable.
>
> builder.stream("topic").groupByKey ().cogroup(Initializer,
> Aggregator, aggValueSerde, storeName).cogroup(groupedStream1,
> Aggregator1).cogroup(groupedStream2, Aggregator2).aggregate();
>
> Behind the scenes we create a KStreamAggregate for each
> KGroupedStream, Aggregator pair. Then a final pass through processor to
> pass on the aggregate values. This gives us a KTable backed by a single
> store that is used in all of the processors.
>
> Please let me know if this is something you think would add value to
> kafka streams. And I will try to create a KIP to foster more 
> communication.
>
> You can take a look at what I have. I think it's missing a fair amount
> but it's a good start. I took the doAggregate method in KGroupedStream as
> my starting point and expanded on it for multiple streams:
> https://github.com/KyleWinkelman/kafka/tree/cogroup
>
>

>>>
>>
>


-- 
-- Guozhang


Re: Kafka-Streams: Cogroup

2017-04-29 Thread Kyle Winkelman
I don't seem to have permission. When logged in I can neither edit the main
page nor create an additional KIP.

Thanks,
Kyle

On Thu, Apr 27, 2017 at 12:35 PM, Eno Thereska 
wrote:

> Hi Kyle,
>
> I believe Guozhang has now given you permission to edit the KIP wiki at
> https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Improvement+Proposals. Could you see if you can add this there?
>
> Many thanks
> Eno
>
> On Wed, Apr 26, 2017 at 6:00 PM, Kyle Winkelman 
> wrote:
>
>> Thank you for your reply.
>>
>> I have attached my first attempt at writing a KIP and I was wondering if
>> you could review it and share your thoughts.
>>
>> Going forward I would like to create this KIP. I was wondering whom I
>> should ask to get the necessary permissions on the wiki. Username:
>> winkelman.kyle
>>
>>
>>
>> On Fri, Apr 21, 2017 at 3:15 PM, Eno Thereska 
>> wrote:
>>
>>> Hi Kyle,
>>>
>>> Sorry for the delay in replying. I think it's worth doing a KIP for this
>>> one. One super helpful thing with KIPs is to list a few more scenarios that
>>> would benefit from this approach. In particular it seems the main benefit
>>> is from reducing the number of state stores. Does this necessarily reduce
>>> the number of IOs to the stores (number of puts/gets), or the extra space
>>> overheads with multiple stores. Quantifying that a bit would help.
>>>
>>> To answer your original questions:
>>>
>>> >The problem I am having with this approach is understanding if there is
>>> a race condition. Obviously the source topics would be copartitioned. But
>>> would it be multithreaded and possibly cause one of the processors to grab
>>> patient 1 at the same time a different processor has grabbed patient 1?
>>>
>>>
>>> I don't think there will be a problem here. A processor cannot be
>>> accessed by multiple threads in Kafka Streams.
>>>
>>>
>>> >My understanding is that for each partition there would be a single
>>> complete set of processors and a new incoming record would go completely
>>> through the processor topology from a source node to a sink node before the
>>> next one is sent through. Is this correct?
>>>
>>> This is mostly true, however if caching is enabled (for dedupping, see
>>> KIP-63), then a record may reside in a cache before going to the sink.
>>> Meanwhile another record can come in. So multiple records can be in the
>>> topology at the same time.
>>>
>>> Thanks
>>> Eno
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Apr 14, 2017 at 8:16 PM, Kyle Winkelman <
>>> winkelman.k...@gmail.com> wrote:
>>>
 Eno,
 Thanks for the response. The figure was just a restatement of my
 questions. I have made an attempt at a low level processor and it appears
 to work but it isn't very pretty and was hoping for something at the
 streams api level.

 I have written some code to show an example of how I see the Cogroup
 working in kafka.

 First the KGroupedStream would have a cogroup method that takes the
 initializer and the aggregator for that specific KGroupedStream. This would
 return a KCogroupedStream that has 2 methods one to add more
 KGroupedStream, Aggregator pairs and one to complete the construction and
 return a KTable.

 builder.stream("topic").groupByKey ().cogroup(Initializer, Aggregator,
 aggValueSerde, storeName).cogroup(groupedStream1,
 Aggregator1).cogroup(groupedStream2, Aggregator2).aggregate();

 Behind the scenes we create a KStreamAggregate for each KGroupedStream,
 Aggregator pair. Then a final pass through processor to pass on the
 aggregate values. This gives us a KTable backed by a single store that is
 used in all of the processors.

 Please let me know if this is something you think would add value to
 kafka streams. And I will try to create a KIP to foster more communication.

 You can take a look at what I have. I think it's missing a fair amount
 but it's a good start. I took the doAggregate method in KGroupedStream as
 my starting point and expanded on it for multiple streams:
 https://github.com/KyleWinkelman/kafka/tree/cogroup


>>>
>>
>


Re: Kafka-Streams: Cogroup

2017-04-27 Thread Eno Thereska
Hi Kyle,

I believe Guozhang has now given you permission to edit the KIP wiki at
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals.
Could you see if you can add this there?

Many thanks
Eno

On Wed, Apr 26, 2017 at 6:00 PM, Kyle Winkelman 
wrote:

> Thank you for your reply.
>
> I have attached my first attempt at writing a KIP and I was wondering if
> you could review it and share your thoughts.
>
> Going forward I would like to create this KIP. I was wondering whom I
> should ask to get the necessary permissions on the wiki. Username:
> winkelman.kyle
>
>
>
> On Fri, Apr 21, 2017 at 3:15 PM, Eno Thereska 
> wrote:
>
>> Hi Kyle,
>>
>> Sorry for the delay in replying. I think it's worth doing a KIP for this
>> one. One super helpful thing with KIPs is to list a few more scenarios that
>> would benefit from this approach. In particular it seems the main benefit
>> is from reducing the number of state stores. Does this necessarily reduce
>> the number of IOs to the stores (number of puts/gets), or the extra space
>> overheads with multiple stores. Quantifying that a bit would help.
>>
>> To answer your original questions:
>>
>> >The problem I am having with this approach is understanding if there is
>> a race condition. Obviously the source topics would be copartitioned. But
>> would it be multithreaded and possibly cause one of the processors to grab
>> patient 1 at the same time a different processor has grabbed patient 1?
>>
>>
>> I don't think there will be a problem here. A processor cannot be
>> accessed by multiple threads in Kafka Streams.
>>
>>
>> >My understanding is that for each partition there would be a single
>> complete set of processors and a new incoming record would go completely
>> through the processor topology from a source node to a sink node before the
>> next one is sent through. Is this correct?
>>
>> This is mostly true, however if caching is enabled (for dedupping, see
>> KIP-63), then a record may reside in a cache before going to the sink.
>> Meanwhile another record can come in. So multiple records can be in the
>> topology at the same time.
>>
>> Thanks
>> Eno
>>
>>
>>
>>
>>
>> On Fri, Apr 14, 2017 at 8:16 PM, Kyle Winkelman > > wrote:
>>
>>> Eno,
>>> Thanks for the response. The figure was just a restatement of my
>>> questions. I have made an attempt at a low level processor and it appears
>>> to work but it isn't very pretty and was hoping for something at the
>>> streams api level.
>>>
>>> I have written some code to show an example of how I see the Cogroup
>>> working in kafka.
>>>
>>> First the KGroupedStream would have a cogroup method that takes the
>>> initializer and the aggregator for that specific KGroupedStream. This would
>>> return a KCogroupedStream that has 2 methods one to add more
>>> KGroupedStream, Aggregator pairs and one to complete the construction and
>>> return a KTable.
>>>
>>> builder.stream("topic").groupByKey ().cogroup(Initializer, Aggregator,
>>> aggValueSerde, storeName).cogroup(groupedStream1,
>>> Aggregator1).cogroup(groupedStream2, Aggregator2).aggregate();
>>>
>>> Behind the scenes we create a KStreamAggregate for each KGroupedStream,
>>> Aggregator pair. Then a final pass through processor to pass on the
>>> aggregate values. This gives us a KTable backed by a single store that is
>>> used in all of the processors.
>>>
>>> Please let me know if this is something you think would add value to
>>> kafka streams. And I will try to create a KIP to foster more communication.
>>>
>>> You can take a look at what I have. I think it's missing a fair amount
>>> but it's a good start. I took the doAggregate method in KGroupedStream as
>>> my starting point and expanded on it for multiple streams:
>>> https://github.com/KyleWinkelman/kafka/tree/cogroup
>>>
>>>
>>
>


Re: Kafka-Streams: Cogroup

2017-04-26 Thread Kyle Winkelman
Thank you for your reply.

I have attached my first attempt at writing a KIP and I was wondering if
you could review it and share your thoughts.

Going forward I would like to create this KIP. I was wondering whom I
should ask to get the necessary permissions on the wiki. Username:
winkelman.kyle



On Fri, Apr 21, 2017 at 3:15 PM, Eno Thereska 
wrote:

> Hi Kyle,
>
> Sorry for the delay in replying. I think it's worth doing a KIP for this
> one. One super helpful thing with KIPs is to list a few more scenarios that
> would benefit from this approach. In particular it seems the main benefit
> is from reducing the number of state stores. Does this necessarily reduce
> the number of IOs to the stores (number of puts/gets), or the extra space
> overheads with multiple stores. Quantifying that a bit would help.
>
> To answer your original questions:
>
> >The problem I am having with this approach is understanding if there is a
> race condition. Obviously the source topics would be copartitioned. But
> would it be multithreaded and possibly cause one of the processors to grab
> patient 1 at the same time a different processor has grabbed patient 1?
>
>
> I don't think there will be a problem here. A processor cannot be accessed
> by multiple threads in Kafka Streams.
>
>
> >My understanding is that for each partition there would be a single
> complete set of processors and a new incoming record would go completely
> through the processor topology from a source node to a sink node before the
> next one is sent through. Is this correct?
>
> This is mostly true, however if caching is enabled (for dedupping, see
> KIP-63), then a record may reside in a cache before going to the sink.
> Meanwhile another record can come in. So multiple records can be in the
> topology at the same time.
>
> Thanks
> Eno
>
>
>
>
>
> On Fri, Apr 14, 2017 at 8:16 PM, Kyle Winkelman 
> wrote:
>
>> Eno,
>> Thanks for the response. The figure was just a restatement of my
>> questions. I have made an attempt at a low level processor and it appears
>> to work but it isn't very pretty and was hoping for something at the
>> streams api level.
>>
>> I have written some code to show an example of how I see the Cogroup
>> working in kafka.
>>
>> First the KGroupedStream would have a cogroup method that takes the
>> initializer and the aggregator for that specific KGroupedStream. This would
>> return a KCogroupedStream that has 2 methods one to add more
>> KGroupedStream, Aggregator pairs and one to complete the construction and
>> return a KTable.
>>
>> builder.stream("topic").groupByKey ().cogroup(Initializer, Aggregator,
>> aggValueSerde, storeName).cogroup(groupedStream1,
>> Aggregator1).cogroup(groupedStream2, Aggregator2).aggregate();
>>
>> Behind the scenes we create a KStreamAggregate for each KGroupedStream,
>> Aggregator pair. Then a final pass through processor to pass on the
>> aggregate values. This gives us a KTable backed by a single store that is
>> used in all of the processors.
>>
>> Please let me know if this is something you think would add value to
>> kafka streams. And I will try to create a KIP to foster more communication.
>>
>> You can take a look at what I have. I think it's missing a fair amount
>> but it's a good start. I took the doAggregate method in KGroupedStream as
>> my starting point and expanded on it for multiple streams:
>> https://github.com/KyleWinkelman/kafka/tree/cogroup
>>
>>
>
KIP-? - Kafka-Streams Cogroup


Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Released: 
Please keep the discussion on the mailing list rather than commenting on the 
wiki (wiki discussions get unwieldy fast).


Motivation
When multiple streams aggregate together to form a single large object (eg. A 
shopping website may have a cart stream, a wish list stream, and a purchases 
stream. Together they make up a Customer.), it is very difficult to accomodate 
this in the Kafka-Streams DSL. It generally requires you to group and aggregate 
all of the streams to KTables then make multiple outerjoin calls to end up with 
a KTable with your desired object. This will create a state store for each 
stream and a long chain of ValueJoiners that each new record must go through to 
get to the final object.
Creating a cogroup method where you use a single state store will:
1. Reduce the number of gets from state stores. With the multiple joins 
when a new value comes into any of the streams a chain reaction happens where 
ValueGetters keep calling ValueGetters until we have accessed all state stores.
2. Slight performance increase. As described above all ValueGetters are 
called also causing all ValueJoiners to be called forcing a recalculation of 
the current joined value of all other streams, impacting performance.


Public Interfaces
KGroupedStream { //Possibly add support for Windows and Sessions as well.
...
 KCogroupedStream cogroup(final Initializer initializer,
  

Re: Kafka-Streams: Cogroup

2017-04-21 Thread Eno Thereska
Hi Kyle,

Sorry for the delay in replying. I think it's worth doing a KIP for this
one. One super helpful thing with KIPs is to list a few more scenarios that
would benefit from this approach. In particular it seems the main benefit
is from reducing the number of state stores. Does this necessarily reduce
the number of IOs to the stores (number of puts/gets), or the extra space
overheads with multiple stores. Quantifying that a bit would help.

To answer your original questions:

>The problem I am having with this approach is understanding if there is a
race condition. Obviously the source topics would be copartitioned. But
would it be multithreaded and possibly cause one of the processors to grab
patient 1 at the same time a different processor has grabbed patient 1?


I don't think there will be a problem here. A processor cannot be accessed
by multiple threads in Kafka Streams.


>My understanding is that for each partition there would be a single
complete set of processors and a new incoming record would go completely
through the processor topology from a source node to a sink node before the
next one is sent through. Is this correct?

This is mostly true, however if caching is enabled (for dedupping, see
KIP-63), then a record may reside in a cache before going to the sink.
Meanwhile another record can come in. So multiple records can be in the
topology at the same time.

Thanks
Eno





On Fri, Apr 14, 2017 at 8:16 PM, Kyle Winkelman 
wrote:

> Eno,
> Thanks for the response. The figure was just a restatement of my
> questions. I have made an attempt at a low level processor and it appears
> to work but it isn't very pretty and was hoping for something at the
> streams api level.
>
> I have written some code to show an example of how I see the Cogroup
> working in kafka.
>
> First the KGroupedStream would have a cogroup method that takes the
> initializer and the aggregator for that specific KGroupedStream. This would
> return a KCogroupedStream that has 2 methods one to add more
> KGroupedStream, Aggregator pairs and one to complete the construction and
> return a KTable.
>
> builder.stream("topic").groupByKey ().cogroup(Initializer, Aggregator,
> aggValueSerde, storeName).cogroup(groupedStream1, 
> Aggregator1).cogroup(groupedStream2,
> Aggregator2).aggregate();
>
> Behind the scenes we create a KStreamAggregate for each KGroupedStream,
> Aggregator pair. Then a final pass through processor to pass on the
> aggregate values. This gives us a KTable backed by a single store that is
> used in all of the processors.
>
> Please let me know if this is something you think would add value to kafka
> streams. And I will try to create a KIP to foster more communication.
>
> You can take a look at what I have. I think it's missing a fair amount but
> it's a good start. I took the doAggregate method in KGroupedStream as my
> starting point and expanded on it for multiple streams:
> https://github.com/KyleWinkelman/kafka/tree/cogroup
>
>


Re: Kafka-Streams: Cogroup

2017-04-14 Thread Kyle Winkelman
Eno,
Thanks for the response. The figure was just a restatement of my questions.
I have made an attempt at a low level processor and it appears to work but
it isn't very pretty and was hoping for something at the streams api level.

I have written some code to show an example of how I see the Cogroup
working in kafka.

First the KGroupedStream would have a cogroup method that takes the
initializer and the aggregator for that specific KGroupedStream. This would
return a KCogroupedStream that has 2 methods one to add more
KGroupedStream, Aggregator pairs and one to complete the construction and
return a KTable.

builder.stream("topic").groupByKey ().cogroup(Initializer, Aggregator,
aggValueSerde, storeName).cogroup(groupedStream1,
Aggregator1).cogroup(groupedStream2, Aggregator2).aggregate();

Behind the scenes we create a KStreamAggregate for each KGroupedStream,
Aggregator pair. Then a final pass through processor to pass on the
aggregate values. This gives us a KTable backed by a single store that is
used in all of the processors.

Please let me know if this is something you think would add value to kafka
streams. And I will try to create a KIP to foster more communication.

You can take a look at what I have. I think it's missing a fair amount but
it's a good start. I took the doAggregate method in KGroupedStream as my
starting point and expanded on it for multiple streams:
https://github.com/KyleWinkelman/kafka/tree/cogroup


Re: Kafka-Streams: Cogroup

2017-04-13 Thread Eno Thereska
Hi Kyle, (cc-ing user list as well)

This could be an interesting scenario. Two things to help us think through it 
some more: 1) it seems you attached a figure, but I cannot seem to open it. 2) 
what about using the low level processor API instead of the DSL as approach 3? 
Do you have any thoughts on that?

Thanks
Eno

> On 13 Apr 2017, at 11:26, Winkelman, Kyle G  wrote:
> 
> Hello,
>  
> I am wondering if there is any way to aggregate together many streams at once 
> to build a larger object. Example (Healthcare Domain):
> I have streams of Medical, Pharmacy, and Lab claims. Key is PatientId, Value 
> is a different Avro Record for each stream.
> I was hoping there was a way to supply a single Initializer, () -> new 
> Patient(), and 3 aggregators, (key, value, patient) -> 
> patient.add**Claim(value).
>  
> Currently the only way that I see to do the above use case is by aggregating 
> each individual stream then joining them. This doesn’t scale well with a 
> large number of input streams because for each stream I would be creating 
> another state store.
>  
> I was hoping to get thoughts on a KCogroupedStream api. I have spent a little 
> time conceptualizing it.
>  
> Approach 1:
> In KGroupedStream add a cogroup method that takes the single initializer, a 
> list of other kgroupedstreams, and a list of other aggregators.
> This would then all flow through a single processor and a have a single 
> backing state store.
> The aggregator that the object will get sent to is determined by the 
> context().topic() which we should be able to trace back to one of the 
> kgroupedstreams in the list.
>  
> The problem I am having with this approach is that because everything is 
> going through the single processors and java doesn’t do the best with generic 
> types. I have to either pass in a list of Type objects for casting the object 
> before sending it to the aggregator or I must create aggregators that accept 
> an object and cast them to the appropriate type.
>  
> Approach 2:
> Create one processor for each aggregator and have a single state store. Then 
> have a single KStreamPassThrough that just passes on the new aggregate value.
> The positive for this is you know which stream it will be coming from and 
> won’t need to do the context().topic() trick.
>  
> The problem I am having with this approach is understanding if there is a 
> race condition. Obviously the source topics would be copartitioned. But would 
> it be multithreaded and possibly cause one of the processors to grab patient 
> 1 at the same time a different processor has grabbed patient 1?
> My understanding is that for each partition there would be a single complete 
> set of processors and a new incoming record would go completely through the 
> processor topology from a source node to a sink node before the next one is 
> sent through. Is this correct?
>  
> 
>  
> If anyone has any additional ideas about this let me know. I don’t know if I 
> have the time to actually create this api so if someone likes the idea and 
> wants to develop it feel free.
> 
> This e-mail, including attachments, may include confidential and/or
> proprietary information, and may be used only by the person or entity
> to which it is addressed. If the reader of this e-mail is not the intended
> recipient or his or her authorized agent, the reader is hereby notified
> that any dissemination, distribution or copying of this e-mail is
> prohibited. If you have received this e-mail in error, please notify the
> sender by replying to this message and delete this e-mail immediately.
> 



Re: Kafka Streams for Remote Server

2016-08-03 Thread Guozhang Wang
Hi Misha,

If it works locally, then I would still suspect that it is due to a
transient timing issue: note that the create-topic script is non-blocking,
i.e. even when it returns it does not necessarily guarantee that the leader
metadata information has been completed propagating to brokers.



Guozhang


On Thu, Jul 28, 2016 at 1:48 AM, mishadoff  wrote:

> Thanks Guozhang,
>
> Yes, I rely on auto-create, and it works locally. Maybe I need to tweak
> some timeout conf for that?
> Also I identified, that even I manually create a topic, it lists but I can
> not produce messages to this topic with the same exception.
> Producing to other topics works well, so it seems like server problem?
>
> — Misha
>
> > On Jul 28, 2016, at 01:44, Guozhang Wang  wrote:
> >
> > Misha,
> >
> > Did you pre-create the sink topic before starting your application or you
> > are relying on the broker-side auto-create for that topic?
> >
> > If you are relying on auto-create, then there is a transient period where
> > the topic is created but the metadata has not been propagated to the
> > brokers so they do not know they are the leader of the created topic
> > partitions yet. And I'd recommend not relying on it since it is really
> > meant for debugging environment only.
> >
> > Guozhang
> >
> >
> > On Wed, Jul 27, 2016 at 5:45 AM, mishadoff  wrote:
> >
> >> Hello,
> >>
> >> I’ve a simplest ever kafka streams application which just reads from one
> >> kafka topic A and write to another topic B.
> >>
> >> When I run it on my local environment (local zk, local kafka broker,
> local
> >> kafka streams app) everything works fine, topic B created and filled
> with
> >> messages from A
> >> If I run it on existing kafka cluster (remote zk, remote kafka, LOCAL
> >> kafka streams) my app is not working anymore.
> >>
> >> It succesfully read the remote topic A, succesfully process the message
> >> and generate a producer record, creates a B topic in remote kafka, bud
> >> during send I get an error
> >>
> >> ```
> >> 15:36:47.242 [kafka-producer-network-thread |
> >> example-message-counter3-1-StreamThread-1-producer] ERROR
> >> o.a.k.s.p.internals.RecordCollector - Error sending record: null
> >> org.apache.kafka.common.errors.NotLeaderForPartitionException: This
> server
> >> is not the leader for that topic-partition
> >> ```
> >>
> >> Could you point me to direction where to start debug or what problems
> >> might cause this behaviour?
> >>
> >> Thanks,
> >> — Misha
> >
> >
> >
> >
> > --
> > -- Guozhang
>
>


-- 
-- Guozhang


Re: Kafka Streams for Remote Server

2016-07-28 Thread mishadoff
Thanks Guozhang, 

Yes, I rely on auto-create, and it works locally. Maybe I need to tweak some 
timeout conf for that?
Also I identified, that even I manually create a topic, it lists but I can not 
produce messages to this topic with the same exception.
Producing to other topics works well, so it seems like server problem?

— Misha

> On Jul 28, 2016, at 01:44, Guozhang Wang  wrote:
> 
> Misha,
> 
> Did you pre-create the sink topic before starting your application or you
> are relying on the broker-side auto-create for that topic?
> 
> If you are relying on auto-create, then there is a transient period where
> the topic is created but the metadata has not been propagated to the
> brokers so they do not know they are the leader of the created topic
> partitions yet. And I'd recommend not relying on it since it is really
> meant for debugging environment only.
> 
> Guozhang
> 
> 
> On Wed, Jul 27, 2016 at 5:45 AM, mishadoff  wrote:
> 
>> Hello,
>> 
>> I’ve a simplest ever kafka streams application which just reads from one
>> kafka topic A and write to another topic B.
>> 
>> When I run it on my local environment (local zk, local kafka broker, local
>> kafka streams app) everything works fine, topic B created and filled with
>> messages from A
>> If I run it on existing kafka cluster (remote zk, remote kafka, LOCAL
>> kafka streams) my app is not working anymore.
>> 
>> It succesfully read the remote topic A, succesfully process the message
>> and generate a producer record, creates a B topic in remote kafka, bud
>> during send I get an error
>> 
>> ```
>> 15:36:47.242 [kafka-producer-network-thread |
>> example-message-counter3-1-StreamThread-1-producer] ERROR
>> o.a.k.s.p.internals.RecordCollector - Error sending record: null
>> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
>> is not the leader for that topic-partition
>> ```
>> 
>> Could you point me to direction where to start debug or what problems
>> might cause this behaviour?
>> 
>> Thanks,
>> — Misha
> 
> 
> 
> 
> -- 
> -- Guozhang



Re: Kafka Streams for Remote Server

2016-07-27 Thread Guozhang Wang
Misha,

Did you pre-create the sink topic before starting your application or you
are relying on the broker-side auto-create for that topic?

If you are relying on auto-create, then there is a transient period where
the topic is created but the metadata has not been propagated to the
brokers so they do not know they are the leader of the created topic
partitions yet. And I'd recommend not relying on it since it is really
meant for debugging environment only.

Guozhang


On Wed, Jul 27, 2016 at 5:45 AM, mishadoff  wrote:

> Hello,
>
> I’ve a simplest ever kafka streams application which just reads from one
> kafka topic A and write to another topic B.
>
> When I run it on my local environment (local zk, local kafka broker, local
> kafka streams app) everything works fine, topic B created and filled with
> messages from A
> If I run it on existing kafka cluster (remote zk, remote kafka, LOCAL
> kafka streams) my app is not working anymore.
>
> It succesfully read the remote topic A, succesfully process the message
> and generate a producer record, creates a B topic in remote kafka, bud
> during send I get an error
>
> ```
> 15:36:47.242 [kafka-producer-network-thread |
> example-message-counter3-1-StreamThread-1-producer] ERROR
> o.a.k.s.p.internals.RecordCollector - Error sending record: null
> org.apache.kafka.common.errors.NotLeaderForPartitionException: This server
> is not the leader for that topic-partition
> ```
>
> Could you point me to direction where to start debug or what problems
> might cause this behaviour?
>
> Thanks,
> — Misha




-- 
-- Guozhang


Re: Kafka Streams question

2016-07-14 Thread Michael Noll
Also, in the next version of Kafka / Kafka Streams such "intermediate"
topics will automatically be created for you when you do joins or
aggregations:

https://issues.apache.org/jira/browse/KAFKA-3561

So my previous message explained your options today when using the current
release of Kafka Streams (v0.10.0.0).

-Michael




On Thu, Jul 14, 2016 at 10:32 AM, Michael Noll  wrote:

> Poul,
>
> to add to what Matthias said:  If you are wondering how to manually create
> a topic, you have basically two options.
>
> A. Use Kafka's CLI tools to create the topic "from the outside".
>
> # Example
> $ kafka-topics.sh --create --topic my-custom-toipc --zookeeper
> localhost:2181 --partitions 1 --replication-factor 1
>
> B. Use Kafka's API to programmatically create the topic.  See [1] for an
> example.
>
> Question for you to learn how we could perhaps improve the status quo:
>  How would you have expected this to work in the current Kafka Streams
> API?  For example, would you have expected that, say, the `through()`
> method would accept parameters to specify the number of partitions?
>
>
> Hope this helps,
> Michael
>
> [1]
> https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/confluent/examples/streams/kafka/KafkaEmbedded.java#L133-L160
>
>
>
>
> On Thu, Jul 14, 2016 at 10:08 AM, Matthias J. Sax 
> wrote:
>
>> Hi,
>>
>> you can manually create a topic with the number of partitions you want
>> to have and use this topic via through()
>>
>> KStream input = ...
>>
>> input.map().through("manually-created-topic").join(...)
>>
>> However, both KStream and KTable need to have the same number of
>> partitions for perform the join. Thus, you might need to create a topic
>> (with the same number of partitions) for the table, too.
>>
>> See
>>
>> http://docs.confluent.io/3.0.0/streams/developer-guide.html#joining-streams
>>
>>
>> -Matthias
>>
>> On 07/13/2016 11:59 PM, Poul Costinsky wrote:
>> > Hi! I am prototyping some code using Kafka Streams, and have a
>> question. I need to map a stream into another (with different partition
>> key) and join it with a table. How do I control number of partitions of the
>> mapped stream?
>> >
>> > Thanks!
>> >
>> > Poul Costinsky
>> > Chief Architect
>> >
>> >  
>> > (360) 207-1753 
>> >
>> >
>> >
>> >
>> >
>>
>>
>
>
> --
>
> *Michael G. Noll | Product Manager | Confluent | +1 650.453.5860
> <%2B1%20650.453.5860>Download Apache Kafka and Confluent Platform:
> www.confluent.io/download *
>



-- 

*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
*


Re: Kafka Streams question

2016-07-14 Thread Michael Noll
Poul,

to add to what Matthias said:  If you are wondering how to manually create
a topic, you have basically two options.

A. Use Kafka's CLI tools to create the topic "from the outside".

# Example
$ kafka-topics.sh --create --topic my-custom-toipc --zookeeper
localhost:2181 --partitions 1 --replication-factor 1

B. Use Kafka's API to programmatically create the topic.  See [1] for an
example.

Question for you to learn how we could perhaps improve the status quo:  How
would you have expected this to work in the current Kafka Streams API?  For
example, would you have expected that, say, the `through()` method would
accept parameters to specify the number of partitions?


Hope this helps,
Michael

[1]
https://github.com/confluentinc/examples/blob/kafka-0.10.0.0-cp-3.0.0/kafka-streams/src/test/java/io/confluent/examples/streams/kafka/KafkaEmbedded.java#L133-L160




On Thu, Jul 14, 2016 at 10:08 AM, Matthias J. Sax 
wrote:

> Hi,
>
> you can manually create a topic with the number of partitions you want
> to have and use this topic via through()
>
> KStream input = ...
>
> input.map().through("manually-created-topic").join(...)
>
> However, both KStream and KTable need to have the same number of
> partitions for perform the join. Thus, you might need to create a topic
> (with the same number of partitions) for the table, too.
>
> See
> http://docs.confluent.io/3.0.0/streams/developer-guide.html#joining-streams
>
>
> -Matthias
>
> On 07/13/2016 11:59 PM, Poul Costinsky wrote:
> > Hi! I am prototyping some code using Kafka Streams, and have a question.
> I need to map a stream into another (with different partition key) and join
> it with a table. How do I control number of partitions of the mapped stream?
> >
> > Thanks!
> >
> > Poul Costinsky
> > Chief Architect
> >
> >  
> > (360) 207-1753 
> >
> >
> >
> >
> >
>
>


-- 

*Michael G. Noll | Product Manager | Confluent | +1 650.453.5860Download
Apache Kafka and Confluent Platform: www.confluent.io/download
*


Re: Kafka Streams question

2016-07-14 Thread Matthias J. Sax
Hi,

you can manually create a topic with the number of partitions you want
to have and use this topic via through()

KStream input = ...

input.map().through("manually-created-topic").join(...)

However, both KStream and KTable need to have the same number of
partitions for perform the join. Thus, you might need to create a topic
(with the same number of partitions) for the table, too.

See
http://docs.confluent.io/3.0.0/streams/developer-guide.html#joining-streams


-Matthias

On 07/13/2016 11:59 PM, Poul Costinsky wrote:
> Hi! I am prototyping some code using Kafka Streams, and have a question. I 
> need to map a stream into another (with different partition key) and join it 
> with a table. How do I control number of partitions of the mapped stream?
> 
> Thanks! 
> 
> Poul Costinsky
> Chief Architect
> 
>  
> (360) 207-1753 
> 
> 
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: Kafka Streams - production use

2016-07-06 Thread Michael Noll
Dirk,

we included the note "be careful when using Kafka Streams in production"
because Kafka Streams as shipped in Kafka 0.10.0.0 is the first-ever
release of Kafka Streams.  In practice, users are running Streams
applications in a variety of stages -- some are doing pilots or
evaluations, some are already running it in production.

The 0.10.x release line will see bug fixes and improvements (as you'd
expect) but may also bring bigger new functionality such as Queryable State
[1].

Hope this helps,
Michael



[1]
https://cwiki.apache.org/confluence/display/KAFKA/KIP-67%3A+Queryable+state+for+Kafka+Streams






On Mon, Jul 4, 2016 at 11:14 AM, Dirk Jeßberger 
wrote:

> Hello,
>
> According to the FAQ (http://docs.confluent.io/3.0.0/streams/faq.html)
> it is not recommended to use Kafka Stream for production. When do you
> expect that it is ready for production use? Within the version 0.10.x.x
> or with later versions?
>
> Best regards
> Dirk Jeßberger
>
> --
> Dirk Jeßberger
> Senior Software Developer
> Homepages & Search Products Development
> 1&1 Mail & Media Development & Technology GmbH | Brauerstraße 48 | 76135
> Karlsruhe | Germany
> E-Mail: dirk.jessber...@1und1.de  |
> Web: www.1und1.de 
> Amtsgericht Montabaur, HRB 5452
>
> Geschäftsführer: Frank Einhellinger, Thomas Ludwig, Jan Oetjen
> Member of United Internet
> Diese E-Mail kann vertrauliche und/oder gesetzlich geschützte
> Informationen enthalten. Wenn Sie nicht der bestimmungsgemäße Adressat
> sind oder diese E-Mail irrtümlich erhalten haben, unterrichten Sie bitte
> den Absender und vernichten Sie diese E-Mail. Anderen als dem
> bestimmungsgemäßen Adressaten ist untersagt, diese E-Mail zu speichern,
> weiterzuleiten oder ihren Inhalt auf welche Weise auch immer zu verwenden.
> This e-mail may contain confidential and/or privileged information. If
> you are not the intended recipient of this e-mail, you are hereby
> notified that saving, distribution or use of the content of this e-mail
> in any way is prohibited. If you have received this e-mail in error,
> please notify the sender and delete the e-mail.
>


Re: [Kafka Streams] Source Nodes

2016-06-29 Thread mishadoff
Thanks Matthias!

Got it working with through, still use 0.10.0.0 version.

> On Jun 29, 2016, at 22:42, Matthias J. Sax  wrote:
> 
> Hi,
> 
> for joins, data of both inputs must be co-located, ie, partitioned on
> the same key and have the same number of partitions:
> 
> See "Note" box at:
> http://docs.confluent.io/3.0.0/streams/developer-guide.html?highlight=join#joining-streams
> 
> From an older email thread about the same issue:
> 
 If you change the key, your partitioning changes, ie, is not valid anymore.
 Thus, the joins (which assumes co-located data) cannot be performed
 (this is the reason why sources get set to null). You can write to an
 intermediate topic via .through(...) to get a valid partitioning:
 
 KStream dataStream = builder.stream(...).map(...).through(...);
 
> 
> Btw: this problem got fixed already; if you use current trunk version
> you do not need the additional call to through(...)
> 
> -Matthias
> 
> 
> On 06/29/2016 06:02 PM, mishadoff wrote:
>> Hey,
>> 
>> I am trying to understand kafka-streams and doing a simple prototype for 
>> joining KStream with KTable, but stuck at the error:
>> 
>> Invalid topology building: KSTREAM-MAP-01 and 
>> KSTREAM-AGGREGATE-04 are not joinable
>> 
>> I tracked down the issue is thrown where stream or table have set 
>> sourceNodes to null, and confirmed both of them are null in my application.
>> 
>> Interesting, when I initially read kafka topic into KStream, sourceNodes are 
>> not null, but later after doing simpel map operation (to exclude unneded 
>> fields an apply conversion) source nodes are deleted.
>> 
>> Could someone clarify what sourceNodes are needed for and why they erased 
>> after map?
>> 
>> Thanks!
>> 
>> — Misha
>> 
> 



Re: [Kafka Streams] Source Nodes

2016-06-29 Thread Matthias J. Sax
Hi,

for joins, data of both inputs must be co-located, ie, partitioned on
the same key and have the same number of partitions:

See "Note" box at:
http://docs.confluent.io/3.0.0/streams/developer-guide.html?highlight=join#joining-streams

From an older email thread about the same issue:

>>> If you change the key, your partitioning changes, ie, is not valid anymore.
>>> Thus, the joins (which assumes co-located data) cannot be performed
>>> (this is the reason why sources get set to null). You can write to an
>>> intermediate topic via .through(...) to get a valid partitioning:
>>>
>>> KStream dataStream = builder.stream(...).map(...).through(...);
>>>

Btw: this problem got fixed already; if you use current trunk version
you do not need the additional call to through(...)

-Matthias


On 06/29/2016 06:02 PM, mishadoff wrote:
> Hey,
> 
> I am trying to understand kafka-streams and doing a simple prototype for 
> joining KStream with KTable, but stuck at the error:
> 
> Invalid topology building: KSTREAM-MAP-01 and 
> KSTREAM-AGGREGATE-04 are not joinable
> 
> I tracked down the issue is thrown where stream or table have set sourceNodes 
> to null, and confirmed both of them are null in my application.
> 
> Interesting, when I initially read kafka topic into KStream, sourceNodes are 
> not null, but later after doing simpel map operation (to exclude unneded 
> fields an apply conversion) source nodes are deleted.
> 
> Could someone clarify what sourceNodes are needed for and why they erased 
> after map?
> 
> Thanks!
> 
> — Misha
> 



signature.asc
Description: OpenPGP digital signature


Re: Kafka Streams aggregation store

2016-06-08 Thread Eno Thereska
Hi Alexander,

I haven't tried Kafka Streams on Windows but did notice that Microsoft has 
merged code into github to make RocksDB available on Windows. Perhaps this is 
useful:
https://blogs.msdn.microsoft.com/bingdevcenter/2015/07/22/open-source-contribution-from-bing-rocksdb-is-now-available-in-windows-platform/

Thanks,
Eno

> On 8 Jun 2016, at 19:47, Alexander Jipa  wrote:
> 
> Hello,
> According to 
> http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple:
> “In terms of implementation Kafka Streams stores this derived aggregation in 
> a local embedded key-value store (RocksDB by default, but you can plug in 
> anything).”
> 
> So I tried running the world count example on my Windows machine (for local 
> test) and got an error because RocksDB is not available for windows.
> I thought it would be easy to switch to an in-memory store.
> But after awhile I’ve figured out that the KStream aggregation implementation 
> doesn’t allow that.
> It looks like aggregateByKey (and thus countByKey) is always using a 
> persistent store.
> More over that it looks like there’s no way to change the default persistent 
> store…
> 
> Even though I was more or less capable of achieving the goal using manual 
> wiring of a Source, a Producer and a Sink – it doesn’t make it for an easy 
> coding…
> 
> The questions that I have are:
> -  Is there a plan of providing a persistent store support for Kafka 
> Streams on Windows?
> -  Is there a plan of providing KStream API to specify a custom 
> store/factory for aggregations?
> -  Is there a way of changing the default persistent store from 
> RocksDB?
> 
> Best Regards,
> Alexander Jipa



Re: Kafka Streams file handle leak

2016-04-25 Thread Greg Fodor
Great, thanks:

https://issues.apache.org/jira/browse/KAFKA-3619


On Mon, Apr 25, 2016 at 11:13 AM, Guozhang Wang  wrote:
> Thanks for reporting that Greg.
>
> I have just added you as a Kafka contributor so you should be able to
> create JIRAs now. Could you create a new JIRA for this and change the RB
> title with the JIRA number?
>
> Thanks!
>
> Guozhang
>
> On Mon, Apr 25, 2016 at 12:13 AM, Greg Fodor  wrote:
>
>> Apologies for a second posting to the list, but the ASF JIRA is
>> preventing me from creating KAFKA- tickets so I wanted to report this
>> issue. This seems to be a critical bug with KStreams, the .lock files
>> in the state store directories do not seem to be having their file
>> handles freed (despite the locks being freed), so on a complex job the
>> number of file handles in use goes up rapidly as the locks are taken
>> for the cleanup routine at the end of the thread run loop. I've
>> created a PR that seems to resolve this issue:
>>
>> https://github.com/apache/kafka/pull/1267
>>
>
>
>
> --
> -- Guozhang


Re: Kafka Streams file handle leak

2016-04-25 Thread Guozhang Wang
Thanks for reporting that Greg.

I have just added you as a Kafka contributor so you should be able to
create JIRAs now. Could you create a new JIRA for this and change the RB
title with the JIRA number?

Thanks!

Guozhang

On Mon, Apr 25, 2016 at 12:13 AM, Greg Fodor  wrote:

> Apologies for a second posting to the list, but the ASF JIRA is
> preventing me from creating KAFKA- tickets so I wanted to report this
> issue. This seems to be a critical bug with KStreams, the .lock files
> in the state store directories do not seem to be having their file
> handles freed (despite the locks being freed), so on a complex job the
> number of file handles in use goes up rapidly as the locks are taken
> for the cleanup routine at the end of the thread run loop. I've
> created a PR that seems to resolve this issue:
>
> https://github.com/apache/kafka/pull/1267
>



-- 
-- Guozhang


Re: Kafka Streams - read topic from beginning

2016-04-21 Thread Guozhang Wang
Hello Maria,

We have some thoughts about supporting finer grained flow controls in Kafka
Streams https://issues.apache.org/jira/browse/KAFKA-3478 as part of a big
effort to improve re-processing user experience, which covers this use
case. We are shooting to have this post 0.10.0.0.

As for now, one work-around I can think of is that upon restart /
re-processing, you can delete the offsets through an admin request (look at
ConsumerGroupCommand).


Guozhang


On Thu, Apr 21, 2016 at 6:19 AM, Maria Abramiuc 
wrote:

> Kafka Streams look great, but there is one thing I don't seem to find a way
> to do:
>
> - read a topic from beginning even if there is a offset saved:
>
>  I have :
>
>  props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>
> this works as described if there is no offset save.
>
>   For a normal consumer we have:
>
>   seekToBeginning
>
>
> In KafkaConsumer:
>
> Line 132:
>
> NetworkClient netClient = new NetworkClient(new
> Selector(config.getLong("connections.max.idle.ms").longValue(),
> this.metrics, this.time, metricGrpPrefix, metricsTags,
> channelBuilder), this.metadata, this.clientId, 100,
> config.getLong("reconnect.backoff.ms").longValue(),
> config.getInt("send.buffer.bytes").intValue(),
> config.getInt("receive.buffer.bytes").intValue(),
> config.getInt("request.timeout.ms").intValue(), this.time);
> this.client = new ConsumerNetworkClient(netClient, this.metadata,
> this.time, this.retryBackoffMs);
> OffsetResetStrategy offsetResetStrategy =
>
> OffsetResetStrategy.valueOf(config.getString("auto.offset.reset").toUpperCase());
> this.subscriptions = new SubscriptionState(offsetResetStrategy);
>
> I can't find any way to set the consumer using StreamsConfig
> properties to seekToBeginning or to set subscriptionState to need
> offset reset.
>
>
>
> *Is there a way to force the consumption of a topic from begging using
> Kafka Streams?*
>
>
> Thank you for all the support provided,
>
> Maria Abramiuc
>



-- 
-- Guozhang


Re: kafka-streams: Using custom partitioner with higher level DSL constructs?

2016-03-21 Thread Guozhang Wang
Also, have you looked at Kafka Connect released in 0.9? It has a MySQL
binlog connector implementation in progress, just thinking maybe you would
be interested to check it out and see if there are any feedbacks that you
want to give.

https://github.com/wushujames/kafka-mysql-connector

Guozhang

On Fri, Mar 18, 2016 at 7:38 AM, Ben Osheroff 
wrote:

> (lemme know if this belongs on the users email list, I'm not sure where
> API questions fall)
>
> Hi, I'm Ben Osheroff, I wrote Maxwell
> (http://github.com/zendesk/maxwell) and have been prototyping an engine
> to do arbitrary denormalizations of Maxwell's CDC events based on the
> kafka-streams library; the elevator pitch is that you can write SQL
> joins which the engine compiles down to stream-joins and aggregations
> and such.
>
> Maxwell partitions its stream by mysql database name, which means that
> to do stream-joins I need to implement the same (custom) partitioning
> algorithm somewhere in my stream processor.  I'd prefer not drop down to
> the lower level `addSink()` library calls if possible, and I can't
> figure out how to mix and match the lower level alls with the higher
> level DSL (map/filter/etc).
>
> So I guess I have two questions:
>
> 1. Is it somehow possible to add a custom `Sink` to an otherwise high
> level stream topology?  There's no obvious way to retrieve the topology
> names that I can see.
>
> 2. If not, I'd like to make a feature request that the various stream
> building functions (.to, .through) accept an optional
> StreamPartitioner.
>
> 3. Any other ideas about how to pull this off?
>
> Thanks!
>
>
> - Ben Osheroff
> zendesk.com
>



-- 
-- Guozhang


Re: kafka-streams: Using custom partitioner with higher level DSL constructs?

2016-03-21 Thread Guozhang Wang
Hello Ben,

1. Currently Kafka Streams high-level DSL does not take the
StreamPartitioner yet, please feel free to file a JIRA so that we can keep
track and discuss of whether / how to incorporate it into Kafka Streams DSL.

2. As for now, you can do two work arounds:

1) use `process()` function to manually write to Kafka with customized
partitioning:

RecordCollector collector = ((RecordCollector.Supplier)
context).recordCollector();
collector.send(*...*);


2), you can call addSink() in KStreamBuilder as well since it is extending
the TopologyBuilder, but you need to know the upstream processor name (i.e.
the parent processor name) which is auto-created in KStreamImpl as
"ProcessorType-IndexSuffix", which is a bit hacky.

Guozhang


On Fri, Mar 18, 2016 at 7:38 AM, Ben Osheroff 
wrote:

> (lemme know if this belongs on the users email list, I'm not sure where
> API questions fall)
>
> Hi, I'm Ben Osheroff, I wrote Maxwell
> (http://github.com/zendesk/maxwell) and have been prototyping an engine
> to do arbitrary denormalizations of Maxwell's CDC events based on the
> kafka-streams library; the elevator pitch is that you can write SQL
> joins which the engine compiles down to stream-joins and aggregations
> and such.
>
> Maxwell partitions its stream by mysql database name, which means that
> to do stream-joins I need to implement the same (custom) partitioning
> algorithm somewhere in my stream processor.  I'd prefer not drop down to
> the lower level `addSink()` library calls if possible, and I can't
> figure out how to mix and match the lower level alls with the higher
> level DSL (map/filter/etc).
>
> So I guess I have two questions:
>
> 1. Is it somehow possible to add a custom `Sink` to an otherwise high
> level stream topology?  There's no obvious way to retrieve the topology
> names that I can see.
>
> 2. If not, I'd like to make a feature request that the various stream
> building functions (.to, .through) accept an optional
> StreamPartitioner.
>
> 3. Any other ideas about how to pull this off?
>
> Thanks!
>
>
> - Ben Osheroff
> zendesk.com
>



-- 
-- Guozhang


Re: kafka streams

2016-03-05 Thread Guozhang Wang
Thanks Bill,

The example code looks great! I will read through it, and am expecting your
high-level examples already!

Guozhang

On Sat, Mar 5, 2016 at 12:39 PM, Bill Bejeck  wrote:

> Oops,  forgot that little bit of information  -
> http://codingjunkie.net/kafka-processor-part1/
>
> A review would be greatly appreciated to make sure I'm on track.
>
> Thanks
>
> On Sat, Mar 5, 2016 at 3:12 PM, Neha Narkhede  wrote:
>
> > Great. Mind sharing the draft or linking to the post if already
> published?
> > Happy to review if that is useful.
> >
> > On Sat, Mar 5, 2016 at 12:07 PM, Bill Bejeck  wrote:
> >
> > > All,
> > >
> > > I'm pretty excited about kafka-streams and I've written a blog (first
> of
> > > 2-3) on the subject, and I'd thought I'd share.
> > >
> > > Thanks,
> > > Bill
> > >
> >
> >
> >
> > --
> > Thanks,
> > Neha
> >
>



-- 
-- Guozhang


Re: kafka streams

2016-03-05 Thread Bill Bejeck
Oops,  forgot that little bit of information  -
http://codingjunkie.net/kafka-processor-part1/

A review would be greatly appreciated to make sure I'm on track.

Thanks

On Sat, Mar 5, 2016 at 3:12 PM, Neha Narkhede  wrote:

> Great. Mind sharing the draft or linking to the post if already published?
> Happy to review if that is useful.
>
> On Sat, Mar 5, 2016 at 12:07 PM, Bill Bejeck  wrote:
>
> > All,
> >
> > I'm pretty excited about kafka-streams and I've written a blog (first of
> > 2-3) on the subject, and I'd thought I'd share.
> >
> > Thanks,
> > Bill
> >
>
>
>
> --
> Thanks,
> Neha
>


Re: kafka streams

2016-03-05 Thread Neha Narkhede
Great. Mind sharing the draft or linking to the post if already published?
Happy to review if that is useful.

On Sat, Mar 5, 2016 at 12:07 PM, Bill Bejeck  wrote:

> All,
>
> I'm pretty excited about kafka-streams and I've written a blog (first of
> 2-3) on the subject, and I'd thought I'd share.
>
> Thanks,
> Bill
>



-- 
Thanks,
Neha