[jira] [Created] (KAFKA-13713) Tech Debt: keep StreamThread and TopologyMetadata's view of the topology in sync

2022-03-06 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13713:
--

 Summary: Tech Debt: keep StreamThread and TopologyMetadata's view 
of the topology in sync
 Key: KAFKA-13713
 URL: https://issues.apache.org/jira/browse/KAFKA-13713
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman


Currently when the topology is modified via an add/remove request, we 
immediately update the TopologyMetadata with the new/removed topology and then 
register listeners for each request so we can complete it once all threads have 
ack'ed the corresponding update, ie upgraded to that minimum topology version.

 For consistency we should consider trying to keep the topology on the minimum 
common version across all (live/active group member) threads. Once a thread 
notices a topology update has been queued, it will update its own view and bump 
it to the latest topology version. We then check if the minimum common topology 
version has increased and then upgrade the official topology as tracked by the 
TopologyMetadata if so.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13712) Make topology addition/removal atomic so we can roll back if request fails

2022-03-06 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13712:
--

 Summary: Make topology addition/removal atomic so we can roll back 
if request fails
 Key: KAFKA-13712
 URL: https://issues.apache.org/jira/browse/KAFKA-13712
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13711) Fix bugs with input topic management to support pattern subscription fully

2022-03-06 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13711:
--

 Summary: Fix bugs with input topic management to support pattern 
subscription fully
 Key: KAFKA-13711
 URL: https://issues.apache.org/jira/browse/KAFKA-13711
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: A. Sophie Blee-Goldman


See https://github.com/apache/kafka/pull/11601



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Re: [DISCUSS] KIP-824 Allowing dumping segmentlogs limiting the batches in the output

2022-03-06 Thread Luke Chen
Hi Sergio,

Thanks for your explanation.
Make sense to me.

> Only interesting thing that I have just found is *max-message-size *is not
used while dump logs are requested, instead it is used by dumpIndex

Are you interested in expanding the scope of this KIP to include the
*max-message-size* in dumping logs?
I think it's good because it will also be a small change, and no need to go
through another KIP discussing/voting process. But I'm fine if you want to
keep this KIP as is, and create another JIRA ticket for future work.

Thank you.
Luke

On Mon, Mar 7, 2022 at 6:02 AM Sergio Daniel Troiano
 wrote:

> hey Luke,
>
> Let me answer them:
> 1. If the *max-batches-size* is too small that results in no records
> output, will we output any information to the user?
>
> If the  *max-batches-size*is even smaller than the first batch then there
> won't be any output, this is handled by FileRecords class, I think this is
> correct as this is the expected behaviour.
>
> 2. After your explanation, I guess the use of *max-batches-size* won't
> conflict with *max-message-size*, right?
>
> Only interesting thing that I have just found is *max-message-size *is not
> used while dump logs are requested, instead it is used by dumpIndex
> <
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/DumpLogSegments.scala#L150
> >
> so,
> this feature is not working for dumping logs, even though I checked if
> there is a unit test for this and there is not any. Maybe we could create a
> ticket for this?
>
> Regards.
>
>
> On Sat, 5 Mar 2022 at 10:36, Luke Chen  wrote:
>
> > Hi Sergio,
> >
> > Thanks for the explanation! Very clear!
> > I think we should put this example and explanation into KIP.
> >
> > Other comments:
> > 1. If the *max-batches-size* is too small that results in no records
> > output, will we output any information to the user?
> > 2. After your explanation, I guess the use of *max-batches-size* won't
> > conflict with *max-message-size*, right?
> > That is, user can set the 2 arguments at the same time. Is that correct?
> >
> > Thank you.
> > Luke
> >
> > Thank you.
> > Luke
> >
> > On Sat, Mar 5, 2022 at 4:47 PM Sergio Daniel Troiano
> >  wrote:
> >
> > > hey Luke,
> > >
> > > thanks for the interest, it is a good question, please let me explain
> > you:
> > >
> > > *max-message-size *a filter for the size of each batch, so for example
> if
> > > Iset --max-message-size 1000 bytes and my segment log has 300 batches,
> > 150
> > > of them has a size of 500 bytes  and the other 150 has a size of 2000
> > bytes
> > > then the script will skip the las 150 ones as each batch is heavier
> than
> > > the limit.
> > >
> > > In the other hand following the same example above with
> *max-batches-size
> > > *set
> > > to 1000 bytes it will only print out the first 2 batches (500 bytes
> each)
> > > and stop, This will avoid reading the whole file
> > >
> > >
> > > Also if all of them are smaller than 1000 bytes it will end up printing
> > out
> > > all the batches.
> > > The idea of my change is to limit the *amount* of batches no matter
> their
> > > size.
> > >
> > > I hope this reply helps.
> > > Best regards.
> > >
> > > On Sat, 5 Mar 2022 at 08:00, Luke Chen  wrote:
> > >
> > > > Hi Sergio,
> > > >
> > > > Thanks for the KIP!
> > > >
> > > > One question:
> > > > I saw there's a `max-message-size` argument that seems to do the same
> > > thing
> > > > as you want.
> > > > Could you help explain what's the difference between
> `max-message-size`
> > > and
> > > > `max-batches-size`?
> > > >
> > > > Thank you.
> > > > Luke
> > > >
> > > > On Sat, Mar 5, 2022 at 3:21 AM Kirk True 
> > wrote:
> > > >
> > > > > Hi Sergio,
> > > > >
> > > > > Thanks for the KIP. I don't know anything about the log segment
> > > > internals,
> > > > > but the logic and implementation seem sound.
> > > > >
> > > > > Three questions:
> > > > >  1. Since the --max-batches-size unit is bytes, does it matter if
> > that
> > > > > size doesn't align to a record boundary?
> > > > >  2. Can you add a check to make sure that --max-batches-size
> doesn't
> > > > allow
> > > > > the user to pass in a negative number?
> > > > >  3. Can you add/update any unit tests related to the
> DumpLogSegments
> > > > > arguments?
> > > > > Thanks,
> > > > > Kirk
> > > > >
> > > > > On Thu, Mar 3, 2022, at 1:32 PM, Sergio Daniel Troiano wrote:
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-824%3A+Allowing+dumping+segmentlogs+limiting+the+batches+in+the+output
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [VOTE] 3.0.1 RC0

2022-03-06 Thread Jakub Scholz
+1 (non-binding). I used the staged Scala 2.13 binaries and the staging
Maven repository to run my tests. All seems to work fine, no issues found.

Thanks
Jakub

On Thu, Mar 3, 2022 at 7:05 PM Mickael Maison  wrote:

> Hello Kafka users, developers and client-developers,
>
> This is the first candidate for release of Apache Kafka 3.0.1.
>
> Apache Kafka 3.0.1 is a bugfix release and 29 issues have been fixed
> since 3.0.0.
>
> Release notes for the 3.0.1 release:
> https://home.apache.org/~mimaison/kafka-3.0.1-rc0/RELEASE_NOTES.html
>
> *** Please download, test and vote by Thursday, March 10, 6pm GMT ***
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> https://kafka.apache.org/KEYS
>
> * Release artifacts to be voted upon (source and binary):
> https://home.apache.org/~mimaison/kafka-3.0.1-rc0/
>
> * Maven artifacts to be voted upon:
> https://repository.apache.org/content/groups/staging/org/apache/kafka/
>
> * Javadoc:
> https://home.apache.org/~mimaison/kafka-3.0.1-rc0/javadoc/
>
> * Tag to be voted upon (off 3.0 branch) is the 3.0.1 tag:
> https://github.com/apache/kafka/releases/tag/3.0.1-rc0
>
> * Documentation:
> https://kafka.apache.org/30/documentation.html
>
> * Protocol:
> https://kafka.apache.org/30/protocol.html
>
> * Successful Jenkins builds for the 3.0 branch:
> I'll share a link once the build complete
>
> /**
>
> Thanks,
> Mickael
>


Re: [DISCUSS] KIP-824 Allowing dumping segmentlogs limiting the batches in the output

2022-03-06 Thread Sergio Daniel Troiano
hey Luke,

Let me answer them:
1. If the *max-batches-size* is too small that results in no records
output, will we output any information to the user?

If the  *max-batches-size*is even smaller than the first batch then there
won't be any output, this is handled by FileRecords class, I think this is
correct as this is the expected behaviour.

2. After your explanation, I guess the use of *max-batches-size* won't
conflict with *max-message-size*, right?

Only interesting thing that I have just found is *max-message-size *is not
used while dump logs are requested, instead it is used by dumpIndex

so,
this feature is not working for dumping logs, even though I checked if
there is a unit test for this and there is not any. Maybe we could create a
ticket for this?

Regards.


On Sat, 5 Mar 2022 at 10:36, Luke Chen  wrote:

> Hi Sergio,
>
> Thanks for the explanation! Very clear!
> I think we should put this example and explanation into KIP.
>
> Other comments:
> 1. If the *max-batches-size* is too small that results in no records
> output, will we output any information to the user?
> 2. After your explanation, I guess the use of *max-batches-size* won't
> conflict with *max-message-size*, right?
> That is, user can set the 2 arguments at the same time. Is that correct?
>
> Thank you.
> Luke
>
> Thank you.
> Luke
>
> On Sat, Mar 5, 2022 at 4:47 PM Sergio Daniel Troiano
>  wrote:
>
> > hey Luke,
> >
> > thanks for the interest, it is a good question, please let me explain
> you:
> >
> > *max-message-size *a filter for the size of each batch, so for example if
> > Iset --max-message-size 1000 bytes and my segment log has 300 batches,
> 150
> > of them has a size of 500 bytes  and the other 150 has a size of 2000
> bytes
> > then the script will skip the las 150 ones as each batch is heavier than
> > the limit.
> >
> > In the other hand following the same example above with *max-batches-size
> > *set
> > to 1000 bytes it will only print out the first 2 batches (500 bytes each)
> > and stop, This will avoid reading the whole file
> >
> >
> > Also if all of them are smaller than 1000 bytes it will end up printing
> out
> > all the batches.
> > The idea of my change is to limit the *amount* of batches no matter their
> > size.
> >
> > I hope this reply helps.
> > Best regards.
> >
> > On Sat, 5 Mar 2022 at 08:00, Luke Chen  wrote:
> >
> > > Hi Sergio,
> > >
> > > Thanks for the KIP!
> > >
> > > One question:
> > > I saw there's a `max-message-size` argument that seems to do the same
> > thing
> > > as you want.
> > > Could you help explain what's the difference between `max-message-size`
> > and
> > > `max-batches-size`?
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Sat, Mar 5, 2022 at 3:21 AM Kirk True 
> wrote:
> > >
> > > > Hi Sergio,
> > > >
> > > > Thanks for the KIP. I don't know anything about the log segment
> > > internals,
> > > > but the logic and implementation seem sound.
> > > >
> > > > Three questions:
> > > >  1. Since the --max-batches-size unit is bytes, does it matter if
> that
> > > > size doesn't align to a record boundary?
> > > >  2. Can you add a check to make sure that --max-batches-size doesn't
> > > allow
> > > > the user to pass in a negative number?
> > > >  3. Can you add/update any unit tests related to the DumpLogSegments
> > > > arguments?
> > > > Thanks,
> > > > Kirk
> > > >
> > > > On Thu, Mar 3, 2022, at 1:32 PM, Sergio Daniel Troiano wrote:
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-824%3A+Allowing+dumping+segmentlogs+limiting+the+batches+in+the+output
> > > > >
> > > >
> > >
> >
>


Re: [DISCUSS] KIP-824 Allowing dumping segmentlogs limiting the batches in the output

2022-03-06 Thread Sergio Daniel Troiano
hey Kirk,

Thanks for the questions, please let me answer them:

1. This is handled by the *FileRecords class, *now the open uses the slice

 which takes care of the end

bytes . There will be some trailing bytes which is totally fine, you can
see here it was expected even here

before my proposal, I added an extra conditional to avoid printing the
warning in the main script here

.

2. *FileRecords *class already checks the value
,
we do the same, similar case when we set the maxMessageSizeOpt



3. I have just added the unit test 




On Fri, 4 Mar 2022 at 20:21, Kirk True  wrote:

> Hi Sergio,
>
> Thanks for the KIP. I don't know anything about the log segment internals,
> but the logic and implementation seem sound.
>
> Three questions:
>  1. Since the --max-batches-size unit is bytes, does it matter if that
> size doesn't align to a record boundary?
>  2. Can you add a check to make sure that --max-batches-size doesn't allow
> the user to pass in a negative number?
>  3. Can you add/update any unit tests related to the DumpLogSegments
> arguments?
> Thanks,
> Kirk
>
> On Thu, Mar 3, 2022, at 1:32 PM, Sergio Daniel Troiano wrote:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-824%3A+Allowing+dumping+segmentlogs+limiting+the+batches+in+the+output
> >
>


Re: [DISCUSS] KIP-820: Extend KStream process with new Processor API

2022-03-06 Thread John Roesler
Hello all,

It seems like we're making good progress on this discussion.
If I'm keeping track correctly, if we can resolve this
question about how to handle processValues(), then we should
be able to finalize the vote, right?

I share Matthias's preference for having a type-safe API.

Just as a reminder, the current approach with transformers
is NOT enforced at compile time. Transformers have access to
a "forwarding disabled" processor context, which still has
the forward methods that throw a runtime exception when
invoked.

However, the spirit of the "new processor api" line of work
is to clean up a lot of the cruft around the original
processor API, so this is a good opportunity to introduce a
type-safe version if we can.

Based on my experience adding the new processor API, I felt
like it should be possible to do what he suggests, but it
would be more involved than what he said. The biggest thing
I learned from that effort, though, is that you really have
to just try it to see what all the complications are.

With that in mind, I went ahead and implemented the
suggestion: https://github.com/apache/kafka/pull/11854

This is a functional prototype. It only adds processValues,
which takes a supplier of a new type, FixedKeyProcessor.
That processor only processes FixedKeyRecords, which have a
key that cannot be changed. FixedKeyProcessors have a
special context, a FixedKeyProcessorContext, which can only
forward FixedKeyRecords.

FixedKeyRecords have "fixed keys" because its key can only
be set in the constructor, and its constructor is package-
private.

As you can see, this new record/processor/context ecosystem
is an independent peer of the general one. This is necessary
to ensure the desired compiler check. For example, if
FixedKeyRecord were merely an interface implemented by
Record, then users could create a new Record with a new key
and forward it as a FixedKeyRecord, violating the
constraint.

As I said, with this proposal, the devil is in the details,
so if anyone thinks the API can be simplified, I suggest you
check out the branch and try out your proposal. I'd be very
happy to have a simplier solution, but I'm also pretty sure
this complexity is necessary.

Taking a step back, I do think this approach results in a
better API, even though the change is a little complicated.

Thanks,
-John

On Sun, 2022-03-06 at 10:51 +, Jorge Esteban Quilcate
Otoya wrote:
> Matthias, thanks for your feedback.
> 
> I can see the following alternatives to deal with `processValues()`:
> 
> 1. Runtime key validation (current proposal)
> 2. Using Void type. Guozhang already points out some important
> considerations about allocating `Record` twice.
> 3. Adding a new ValueRecord, proposed by Matthias. This one would carry
> some of the problems of the second alternative as ValueRecord will have to
> be created from a Record. Also, either by having a public constructor or
> creation from a Record, the key _can_ be changed without being captured by
> the Topology.
> 4. Reducing the KIP scope to `process` only, and removing/postponing
> `processValues` for a later DSL redesign.
> 
> A couple of additional comments:
> 
> About the Record API:
> 
> IIUC, the issue with allocating new objects is coming from the current
> design of the Record API.
> If a user does record.withKey(...).withValue(...) is already leading to a
> couple of instatiations.
> My impression is that if the cost/value of immutability has been weighed
> already, then maybe the considerations for alternative 2 can be disregarded?
> Either way, if the cost of recreation of objects is something we want to
> minimize, then maybe adding a Builder to the record should help to reduce
> the allocations.
> 
> About the key validation:
> 
> So far, the only way I can see to _really_ validate a key doesn't change at
> compile-time is by not exposing it at all — as we are doing it today with
> Transform.
> Otherwise, deal with it at runtime — as we have been dealing with Transform
> without the ability to forward.
> Processor API already —by definition— means lower-level abstraction,
> therefore users should be aware of the potential runtime exceptions if the
> key changes.
> This is why I'm leaning towards alternative 1.
> 
> Looking forward to your feedback.
> As a reminder, the vote thread is still open. Feel free to add your vote or
> amend if needed.
> 
> Cheers,
> 
> 
> On Fri, 4 Mar 2022 at 06:51, Matthias J. Sax  wrote:
> 
> > John, thanks for verifying source compatibility. My impression was that
> > it should be source compatible, I was just not 100% sure.
> > 
> > The question about `processValues()` is really a hard one. Guozhang's
> > point is very good one. Maybe we need to be pragmatic and accept the
> > runtime check (even if I deeply hate this solution compare to a compile
> > time check).
> > 
> > Other possibilities to address this issue might just become too ugly? It
> > seems it would require to add a new `ValueProcessorContext` that offers
> 

Re: [DISCUSS] KIP-820: Extend KStream process with new Processor API

2022-03-06 Thread Jorge Esteban Quilcate Otoya
Matthias, thanks for your feedback.

I can see the following alternatives to deal with `processValues()`:

1. Runtime key validation (current proposal)
2. Using Void type. Guozhang already points out some important
considerations about allocating `Record` twice.
3. Adding a new ValueRecord, proposed by Matthias. This one would carry
some of the problems of the second alternative as ValueRecord will have to
be created from a Record. Also, either by having a public constructor or
creation from a Record, the key _can_ be changed without being captured by
the Topology.
4. Reducing the KIP scope to `process` only, and removing/postponing
`processValues` for a later DSL redesign.

A couple of additional comments:

About the Record API:

IIUC, the issue with allocating new objects is coming from the current
design of the Record API.
If a user does record.withKey(...).withValue(...) is already leading to a
couple of instatiations.
My impression is that if the cost/value of immutability has been weighed
already, then maybe the considerations for alternative 2 can be disregarded?
Either way, if the cost of recreation of objects is something we want to
minimize, then maybe adding a Builder to the record should help to reduce
the allocations.

About the key validation:

So far, the only way I can see to _really_ validate a key doesn't change at
compile-time is by not exposing it at all — as we are doing it today with
Transform.
Otherwise, deal with it at runtime — as we have been dealing with Transform
without the ability to forward.
Processor API already —by definition— means lower-level abstraction,
therefore users should be aware of the potential runtime exceptions if the
key changes.
This is why I'm leaning towards alternative 1.

Looking forward to your feedback.
As a reminder, the vote thread is still open. Feel free to add your vote or
amend if needed.

Cheers,


On Fri, 4 Mar 2022 at 06:51, Matthias J. Sax  wrote:

> John, thanks for verifying source compatibility. My impression was that
> it should be source compatible, I was just not 100% sure.
>
> The question about `processValues()` is really a hard one. Guozhang's
> point is very good one. Maybe we need to be pragmatic and accept the
> runtime check (even if I deeply hate this solution compare to a compile
> time check).
>
> Other possibilities to address this issue might just become too ugly? It
> seems it would require to add a new `ValueProcessorContext` that offers
> a `#forward(ValueRecord)` method (with `ValueRecord` being a `Record`
> with immutable key? Not sure if we would be willing to go down this
> route? Personally, I would be ok with it, as a strongly prefer compile
> time checks and I am happy to extend the API surface area to achieve it
> -- however, I won't be surprised if others don't like this idea...
>
>
>
> -Matthias
>
> On 2/27/22 6:20 AM, Jorge Esteban Quilcate Otoya wrote:
> > Thanks, Guozhang.
> >
> >> Compared with reference checks and runtime exceptions for those who
> >> mistakenly change the key, I think that enforcing everyone to `setValue`
> >> may incur more costs..
> >
> > This is a fair point. I agree that this may incur in more costs than key
> > checking.
> >
> > Will hold for more feedback, but if we agree I will update the KIP during
> > the week.
> >
> > Cheers,
> > Jorge.
> >
> >
> > On Sun, 27 Feb 2022 at 00:50, Guozhang Wang  wrote:
> >
> >> Hello folks,
> >>
> >> Regarding the outstanding question, I'm actually a bit leaning towards
> the
> >> second option since that `withKey()` itself always creates a new Record
> >> object. This has a few implications:
> >>
> >> * That we would have to discard the previous Record object to be GC'ed
> with
> >> the new object --- note in practice, processing value does not mean
> you'd
> >> have to replace the whole value with `withValue`, but maybe you just
> need
> >> to manipulate some fields of the value object if it is a JSon / etc.
> >> * It may become an obstacle for further runtime optimizations e.g. skip
> >> serdes and interpret processing as direct byte manipulations.
> >>
> >> Compared with reference checks and runtime exceptions for those who
> >> mistakenly change the key, I think that enforcing everyone to `setValue`
> >> may incur more costs..
> >>
> >> Guozhang
> >>
> >> On Fri, Feb 25, 2022 at 12:54 PM Jorge Esteban Quilcate Otoya <
> >> quilcate.jo...@gmail.com> wrote:
> >>
> >>> Hi all,
> >>>
> >>> Appreciate very much all the great feedback received so far.
> >>>
>  After applying that interface change, I don't see any syntax
> >>> errors in our tests (which use those methods), and the
> >>> StreamBuilderTest still passes for me.
> >>>
> >>> This is awesome John, thank you for your efforts here.
> >>>
>  Jorge, do you mind clarifying these points in the Compatibility
> section
> >>> of your KIP?
> >>>
> >>> +1. I have clarified the impact of changing the return type in the KIP.
> >>>
>  I think the other outstanding question for you is whether