[jira] [Created] (KAFKA-16666) Remove unused class `GroupMetadataMessageFormatter` and `OffsetsMessageFormatter`

2024-05-03 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-1:
--

 Summary: Remove unused class `GroupMetadataMessageFormatter` and 
`OffsetsMessageFormatter`
 Key: KAFKA-1
 URL: https://issues.apache.org/jira/browse/KAFKA-1
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


After https://github.com/apache/kafka/pull/15652, both 
`GroupMetadataMessageFormatter`[0] and `OffsetsMessageFormatter`[1] get unused. 
We should remove them.

[0] 
https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1269
[1] 
https://github.com/apache/kafka/blob/376e9e20dbf7c7aeb6f6f666d47932c445eb6bd1/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L1248



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1018: Introduce max remote fetch timeout config

2024-05-03 Thread Kamal Chandraprakash
Luke,

Thanks for the review!

DelayedFetch and DelayedRemoteFetch are orthogonal calls

.
Only one of them will be active in a given FETCH request.

The fetch request might take more than 500 ms when the time
taken to read data from remote storage exceeds 500 ms and
`remote.fetch.max.wait.ms` is configured higher than 500 ms.

--
Kamal


On Fri, May 3, 2024 at 1:55 PM Luke Chen  wrote:

> Hi Kamal,
>
> Thanks for the KIP!
> Sorry for the late review.
>
> Overall LGTM! Just 1 question:
>
> If one fetch request contains 2 partitions: [p1, p2]
> fetch.max.wait.ms: 500, remote.fetch.max.wait.ms: 1000
>
> And now, p1 fetch offset is the log end offset and has no new data coming,
> and p2 fetch offset is to fetch from remote storage.
> And suppose the fetch from remote storage takes 1000ms.
> So, question:
> Will this fetch request return in 500ms or 1000ms?
> And what will be returned?
>
> I think before this change, it'll return within 500ms, right?
> But it's not clear what behavior it will be after this KIP.
>
> Thank you.
> Luke
>
>
> On Fri, May 3, 2024 at 1:56 PM Kamal Chandraprakash <
> kamal.chandraprak...@gmail.com> wrote:
>
> > Christo,
> >
> > We have localTimeMs, remoteTimeMs, and totalTimeMs as part of the
> > FetchConsumer request metric.
> >
> >
> >
> kafka.network:type=RequestMetrics,name={LocalTimeMs|RemoteTimeMs|TotalTimeMs},request={Produce|FetchConsumer|FetchFollower}
> >
> > RemoteTimeMs refers to the amount of time spent in the purgatory for
> normal
> > fetch requests
> > and amount of time spent in reading the remote data for remote-fetch
> > requests. Do we want
> > to have a separate `TieredStorageTimeMs` to capture the time spent in
> > remote-read requests?
> >
> > With per-broker level timer metrics combined with the request level
> > metrics, the user will have
> > sufficient information.
> >
> > Metric name =
> >
> >
> kafka.log.remote:type=RemoteLogManager,name=RemoteLogReaderFetchRateAndTimeMs
> >
> > --
> > Kamal
> >
> > On Mon, Apr 29, 2024 at 1:38 PM Christo Lolov 
> > wrote:
> >
> > > Heya!
> > >
> > > Is it difficult to instead add the metric at
> > > kafka.network:type=RequestMetrics,name=TieredStorageMs (or some other
> > > name=*)? Alternatively, if it is difficult to add it there, is it
> > possible
> > > to add 2 metrics, one at the RequestMetrics level (even if it is
> > > total-time-ms - (all other times)) and one at what you are proposing?
> As
> > an
> > > operator I would find it strange to not see the metric in the
> > > RequestMetrics.
> > >
> > > Your thoughts?
> > >
> > > Best,
> > > Christo
> > >
> > > On Sun, 28 Apr 2024 at 10:52, Kamal Chandraprakash <
> > > kamal.chandraprak...@gmail.com> wrote:
> > >
> > > > Christo,
> > > >
> > > > Updated the KIP with the remote fetch latency metric. Please take
> > another
> > > > look!
> > > >
> > > > --
> > > > Kamal
> > > >
> > > > On Sun, Apr 28, 2024 at 12:23 PM Kamal Chandraprakash <
> > > > kamal.chandraprak...@gmail.com> wrote:
> > > >
> > > > > Hi Federico,
> > > > >
> > > > > Thanks for the suggestion! Updated the config name to "
> > > > > remote.fetch.max.wait.ms".
> > > > >
> > > > > Christo,
> > > > >
> > > > > Good point. We don't have the remote-read latency metrics to
> measure
> > > the
> > > > > performance of the remote read requests. I'll update the KIP to
> emit
> > > this
> > > > > metric.
> > > > >
> > > > > --
> > > > > Kamal
> > > > >
> > > > >
> > > > > On Sat, Apr 27, 2024 at 4:03 PM Federico Valeri <
> > fedeval...@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> Hi Kamal, it looks like all TS configurations starts with
> "remote."
> > > > >> prefix, so I was wondering if we should name it
> > > > >> "remote.fetch.max.wait.ms".
> > > > >>
> > > > >> On Fri, Apr 26, 2024 at 7:07 PM Kamal Chandraprakash
> > > > >>  wrote:
> > > > >> >
> > > > >> > Hi all,
> > > > >> >
> > > > >> > If there are no more comments, I'll start a vote thread by
> > tomorrow.
> > > > >> > Please review the KIP.
> > > > >> >
> > > > >> > Thanks,
> > > > >> > Kamal
> > > > >> >
> > > > >> > On Sat, Mar 30, 2024 at 11:08 PM Kamal Chandraprakash <
> > > > >> > kamal.chandraprak...@gmail.com> wrote:
> > > > >> >
> > > > >> > > Hi all,
> > > > >> > >
> > > > >> > > Bumping the thread. Please review this KIP. Thanks!
> > > > >> > >
> > > > >> > > On Thu, Feb 1, 2024 at 9:11 PM Kamal Chandraprakash <
> > > > >> > > kamal.chandraprak...@gmail.com> wrote:
> > > > >> > >
> > > > >> > >> Hi Jorge,
> > > > >> > >>
> > > > >> > >> Thanks for the review! Added your suggestions to the KIP.
> PTAL.
> > > > >> > >>
> > > > >> > >> The `fetch.max.wait.ms` config will be also applicable for
> > > topics
> > > > >> > >> enabled with remote storage.
> > > > >> > >> Updated the description to:
> > > > >> > >>
> > > > >> > >> ```
> > > > >> > >> The maximum amount of time the server will block before
> > answering
> > > > the
> > > > 

Re: [DISCUSS] KIP-932: Queues for Kafka

2024-05-03 Thread Andrew Schofield
Hi Jun,
Thanks for your reply.

161. ShareGroupListing and ShareGroupDescription are using
the same pattern as ConsumerGroupListing and
ConsumerGroupDescription. I have gone for consistency which
I think is probably best here. It’s what I would expect if I had previously
used the admin API for consumer groups and was looking to use it for
share groups. I agree it’s a bit weird.

162. GroupListing contains the only information which is properly
in common between a ConsumerGroupListing and a ShareGroupListing.
ListGroupsResponse.ProtocolType is interpreted to provide the
group type. I know that the ListGroups RPC also includes the group
state, but that’s as a string and there’s no common enum for the states
of all types of group. As a result, I have exposed group type but not
state on this API.

Previously in the discussion for this KIP, I mentioned that I would
create another KIP for the administration of groups, in particular
how the administrator can ensure that particular group IDs
are used for the group type they desire. At the moment, I think
keeping ListGroups in this KIP is a good idea. If we actually want
to make it more sophisticated, perhaps that would be better with
the group administration KIP.

163. It will be one higher than the latest version at the time we are
ready to deliver this feature for real. When we are on the cusp of
delivery, I’ll update the KIP with the final value.

164. KRaft only. All the RPCs are “broker” only. None of the code will
be merged until after 3.8 has branched.

Thanks,
Andrew

> On 4 May 2024, at 00:12, Jun Rao  wrote:
> 
> Hi, Andrew,
> 
> Thanks for the reply. A few more comments.
> 
> 161. ShareGroupListing.state() returns an optional, but
> ShareGroupDescription.state() does not. Should we make them consistent?
> Also, it seems a bit weird to return optional with an UNKNOWN state.
> 
> 162. Should GroupListing include ProtocolType and GroupState too?
> 
> 163. What is the value of group.version to gate the queuing feature?
> 
> 164. Is the queueing feature only supported on KRaft clusters? For example,
> the feature tool seems to be built only for the KRaft cluster.
> 
> Jun
> 
> On Fri, May 3, 2024 at 10:32 AM Andrew Schofield 
> wrote:
> 
>> Hi Jun,
>> Thanks for your reply.
>> 
>> 147. Yes, I see what you mean. The rebalance latency will indeed
>> be very short by comparison. I have removed the rebalance latency
>> metrics from the client and retained the rebalance count and rate.
>> 
>> 150. Yes, I think so. I have tweaked the text so that the simple
>> assignor will take into account existing assignment information when
>> it has it, which would just minimise unnecessary churn of (b).
>> 
>> 158. I’ve changed it to ReadShareGroupStateSummary.
>> 
>> Thanks,
>> Andrew
>> 
>> 
>>> On 3 May 2024, at 22:17, Jun Rao  wrote:
>>> 
>>> Hi, Andrew,
>>> 
>>> Thanks for the reply.
>>> 
>>> 147. There seems to be some difference between consumer groups and share
>>> groups. In the consumer groups, if a client receives a heartbeat response
>>> to revoke some partitions, it may have to commit offsets before revoking
>>> partitions or it may have to call the rebalance callbacks provided by the
>>> user. This may take some time and can be reflected in the rebalance time
>>> metric. In the share groups, none of that exists. If a client receives
>> some
>>> added/revoked partitions, it accepts them immediately, right? So, does
>> that
>>> practically make the rebalance time always 0?
>>> 
>>> 150. I guess in the common case, there will be many more members than
>>> partitions. So the need for (b) will be less common. We can probably
>> leave
>>> the persisting of the assignment out for now.
>>> 
>>> 158. The new name sounds good to me.
>>> 
>>> Jun
>>> 
>>> On Thu, May 2, 2024 at 10:21 PM Andrew Schofield <
>> andrew_schofi...@live.com>
>>> wrote:
>>> 
 Hi Jun,
 Thanks for the response.
 
 147. I am trying to get a correspondence between the concepts and
 metrics of consumer groups and share groups. In both cases,
 the client doesn’t strictly know when the rebalance starts. All it knows
 is when it has work to do in order to perform its part of a rebalance.
 I am proposing that share groups and consumer groups use
 equivalent logic.
 
 I could remove the rebalance metrics from the client because I do
 understand that they are making a judgement about when a rebalance
 starts, but it’s their own part of the rebalance they are measuring.
 
 I tend to think these metrics are better than no metrics and
 will at least enable administrators to see how much rebalance
 activity the members of share groups are experiencing.
 
 150. The simple assignor does not take existing assignments into
 consideration. The ShareGroupPartitionAssignor interface would
 permit this, but the simple assignor does not currently use it.
 
 The simple assignor assigns partitions in two ways:
 a) Distribute 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2869

2024-05-03 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Matthias J. Sax
That's good questions... I could think of a few approaches, but I admit 
it might all be a little bit tricky to code up...


However if we don't solve this problem, I think this KIP does not really 
solve the core issue we are facing? In the end, if we rely on the 
`.checkpoint` file to compute a task assignment, but the `.checkpoint` 
file can be arbitrary stale after a crash because we only write it on a 
clean close, there would be still a huge gap that this KIP does not close?


For the case in which we keep the checkpoint file, this KIP would still 
help for "soft errors" in which KS can recover, and roll back the store. 
A significant win for sure. -- But hard crashes would still be an 
problem? We might assign tasks to "wrong" instance, ie, which are not 
most up to date, as the checkpoint information could be very outdated? 
Would we end up with a half-baked solution? Would this be good enough to 
justify the introduced complexity? In the, for soft failures it's still 
a win. Just want to make sure we understand the limitations and make an 
educated decision.


Or do I miss something?


-Matthias

On 5/3/24 10:20 AM, Bruno Cadonna wrote:

Hi Matthias,


200:
I like the idea in general. However, it is not clear to me how the 
behavior should be with multiple stream threads in the same Kafka 
Streams client. What stream thread opens which store? How can a stream 
thread pass an open store to another stream thread that got the 
corresponding task assigned? How does a stream thread know that a task 
was not assigned to any of the stream threads of the Kafka Streams 
client? I have the feeling we should just keep the .checkpoint file on 
close for now to unblock this KIP and try to find a solution to get 
totally rid of it later.



Best,
Bruno



On 5/3/24 6:29 PM, Matthias J. Sax wrote:
101: Yes, but what I am saying is, that we don't need to flush the 
.position file to disk periodically, but only maintain it in main 
memory, and only write it to disk on close() to preserve it across 
restarts. This way, it would never be ahead, but might only lag? But 
with my better understanding about (102) it might be mood anyway...



102: Thanks for clarifying. Looked into the code now. Makes sense. 
Might be something to be worth calling out explicitly in the KIP 
writeup. -- Now that I realize that the position is tracked inside the 
store (not outside as the changelog offsets) it makes much more sense 
to pull position into RocksDB itself. In the end, it's actually a 
"store implementation" detail how it tracks the position (and kinda 
leaky abstraction currently, that we re-use the checkpoint file 
mechanism to track it and flush to disk).



200: I was thinking about this a little bit more, and maybe it's not 
too bad? When KS starts up, we could upon all stores we find on local 
disk pro-actively, and keep them all open until the first rebalance 
finishes: For tasks we get assigned, we hand in the already opened 
store (this would amortize the cost to open the store before the 
rebalance) and for non-assigned tasks, we know the offset information 
won't change and we could just cache it in-memory for later reuse (ie, 
next rebalance) and close the store to free up resources? -- Assuming 
that we would get a large percentage of opened stores assigned as 
tasks anyway, this could work?



-Matthias

On 5/3/24 1:29 AM, Bruno Cadonna wrote:

Hi Matthias,


101:
Let's assume a RocksDB store, but I think the following might be true 
also for other store implementations. With this KIP, if Kafka Streams 
commits the offsets, the committed offsets will be stored in an 
in-memory data structure (i.e. the memtable) and stay there until 
RocksDB decides that it is time to persist its in-memory data 
structure. If Kafka Streams writes its position to the .position file 
during a commit and a crash happens before RocksDB persist the 
memtable then the position in the .position file is ahead of the 
persisted offset. If IQ is done between the crash and the state store 
fully restored the changelog, the position might tell IQ that the 
state store is more up-to-date than it actually is.
In contrast, if Kafka Streams handles persisting positions the same 
as persisting offset, the position should always be consistent with 
the offset, because they are persisted together.



102:
I am confused about your confusion which tells me that we are talking 
about two different things.

You asked

"Do you intent to add this information [i.e. position] to the map 
passed via commit(final Map changelogOffsets)?"


and with what I wrote I meant that we do not need to pass the 
position into the implementation of the StateStore interface since 
the position is updated within the implementation of the StateStore 
interface (e.g. RocksDBStore [1]). My statement describes the 
behavior now, not the change proposed in this KIP, so it does not 
contradict what is stated in the KIP.



200:
This is about Matthias' main concern about 

[jira] [Resolved] (KAFKA-16474) AsyncKafkaConsumer might send out heartbeat request without waiting for its response

2024-05-03 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16474?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee resolved KAFKA-16474.

Resolution: Fixed

Ran the test several times, the client log also looks fine.

> AsyncKafkaConsumer might send out heartbeat request without waiting for its 
> response
> 
>
> Key: KAFKA-16474
> URL: https://issues.apache.org/jira/browse/KAFKA-16474
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Critical
>  Labels: kip-848-client-support
> Fix For: 3.8.0
>
> Attachments: failing_results.zip
>
>
> KAFKA-16389
> We've discovered that in some uncommon cases, the consumer could send out 
> successive heartbeats without waiting for the response to come back.  this 
> might result in causing the consumer to revoke its just assigned assignments 
> in some cases.  For example:
>  
> The consumer first sends out a heartbeat with epoch=0 and memberId='' 
> The consumer then rapidly sends out another heartbeat with epoch=0 and 
> memberId='' because it has not gotten any response and thus not updating its 
> local state
>  
> The consumer receives assignments from the first heartbeat and reconciles its 
> assignment.
>  
> Since the second heartbeat has epoch=0 and memberId='', the server will think 
> this is a new member joining and therefore send out an empty assignment.  
>  
> The consumer receives the response from the second heartbeat.  Revoke all of 
> its partitions.
>  
> There are 2 issues associate with this bug:
>  # inflight logic
>  # rapid poll: In the KAFKA-16389 we've observe consumer polling interval to 
> be a few ms.  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-932: Queues for Kafka

2024-05-03 Thread Jun Rao
Hi, Andrew,

Thanks for the reply. A few more comments.

161. ShareGroupListing.state() returns an optional, but
ShareGroupDescription.state() does not. Should we make them consistent?
Also, it seems a bit weird to return optional with an UNKNOWN state.

162. Should GroupListing include ProtocolType and GroupState too?

163. What is the value of group.version to gate the queuing feature?

164. Is the queueing feature only supported on KRaft clusters? For example,
the feature tool seems to be built only for the KRaft cluster.

Jun

On Fri, May 3, 2024 at 10:32 AM Andrew Schofield 
wrote:

> Hi Jun,
> Thanks for your reply.
>
> 147. Yes, I see what you mean. The rebalance latency will indeed
> be very short by comparison. I have removed the rebalance latency
> metrics from the client and retained the rebalance count and rate.
>
> 150. Yes, I think so. I have tweaked the text so that the simple
> assignor will take into account existing assignment information when
> it has it, which would just minimise unnecessary churn of (b).
>
> 158. I’ve changed it to ReadShareGroupStateSummary.
>
> Thanks,
> Andrew
>
>
> > On 3 May 2024, at 22:17, Jun Rao  wrote:
> >
> > Hi, Andrew,
> >
> > Thanks for the reply.
> >
> > 147. There seems to be some difference between consumer groups and share
> > groups. In the consumer groups, if a client receives a heartbeat response
> > to revoke some partitions, it may have to commit offsets before revoking
> > partitions or it may have to call the rebalance callbacks provided by the
> > user. This may take some time and can be reflected in the rebalance time
> > metric. In the share groups, none of that exists. If a client receives
> some
> > added/revoked partitions, it accepts them immediately, right? So, does
> that
> > practically make the rebalance time always 0?
> >
> > 150. I guess in the common case, there will be many more members than
> > partitions. So the need for (b) will be less common. We can probably
> leave
> > the persisting of the assignment out for now.
> >
> > 158. The new name sounds good to me.
> >
> > Jun
> >
> > On Thu, May 2, 2024 at 10:21 PM Andrew Schofield <
> andrew_schofi...@live.com>
> > wrote:
> >
> >> Hi Jun,
> >> Thanks for the response.
> >>
> >> 147. I am trying to get a correspondence between the concepts and
> >> metrics of consumer groups and share groups. In both cases,
> >> the client doesn’t strictly know when the rebalance starts. All it knows
> >> is when it has work to do in order to perform its part of a rebalance.
> >> I am proposing that share groups and consumer groups use
> >> equivalent logic.
> >>
> >> I could remove the rebalance metrics from the client because I do
> >> understand that they are making a judgement about when a rebalance
> >> starts, but it’s their own part of the rebalance they are measuring.
> >>
> >> I tend to think these metrics are better than no metrics and
> >> will at least enable administrators to see how much rebalance
> >> activity the members of share groups are experiencing.
> >>
> >> 150. The simple assignor does not take existing assignments into
> >> consideration. The ShareGroupPartitionAssignor interface would
> >> permit this, but the simple assignor does not currently use it.
> >>
> >> The simple assignor assigns partitions in two ways:
> >> a) Distribute the members across the partitions by hashed member ID.
> >> b) If any partitions have no members assigned, distribute the members
> >> across these partitions round-robin.
> >>
> >> The (a) partitions will be quite stable. The (b) partitions will be less
> >> stable. By using existing assignment information, it could make (b)
> >> partition assignment more stable, whether the assignments are
> >> persisted or not. Perhaps it would be worth changing the simple
> >> assignor in order to make (b) more stable.
> >>
> >> I envisage more sophisticated assignors in the future which could use
> >> existing assignments and also other dynamic factors such as lag.
> >>
> >> If it transpires that there is significant benefit in persisting
> >> assignments
> >> specifically to help smooth assignment in the event of GC change,
> >> it would be quite an easy enhancement. I am not inclined to persist
> >> the assignments in this KIP.
> >>
> >> 158. Ah, yes. I see. Of course, I want the names as consistent and
> >> understandable too. I suggest renaming
> >> ReadShareGroupOffsetsState to ReadShareGroupStateSummary.
> >> I haven’t changed the KIP yet, so let me know if that’s OK.
> >>
> >> Thanks,
> >> Andrew
> >>
> >>> On 2 May 2024, at 22:18, Jun Rao  wrote:
> >>>
> >>> Hi, Andrew,
> >>>
> >>> Thanks for the reply.
> >>>
> >>> 147. " it makes a judgement about whether an assignment received is
> equal
> >>> to what it already is using."
> >>> If a client receives an assignment different from what it has, it
> >> indicates
> >>> the end of the rebalance. But how does the client know when the
> rebalance
> >>> starts? In the shareHeartbeat design, the new group 

Re: [DISCUSS] KIP-932: Queues for Kafka

2024-05-03 Thread Andrew Schofield
Hi Jun,
Thanks for your reply.

147. Yes, I see what you mean. The rebalance latency will indeed
be very short by comparison. I have removed the rebalance latency
metrics from the client and retained the rebalance count and rate.

150. Yes, I think so. I have tweaked the text so that the simple
assignor will take into account existing assignment information when
it has it, which would just minimise unnecessary churn of (b).

158. I’ve changed it to ReadShareGroupStateSummary.

Thanks,
Andrew


> On 3 May 2024, at 22:17, Jun Rao  wrote:
> 
> Hi, Andrew,
> 
> Thanks for the reply.
> 
> 147. There seems to be some difference between consumer groups and share
> groups. In the consumer groups, if a client receives a heartbeat response
> to revoke some partitions, it may have to commit offsets before revoking
> partitions or it may have to call the rebalance callbacks provided by the
> user. This may take some time and can be reflected in the rebalance time
> metric. In the share groups, none of that exists. If a client receives some
> added/revoked partitions, it accepts them immediately, right? So, does that
> practically make the rebalance time always 0?
> 
> 150. I guess in the common case, there will be many more members than
> partitions. So the need for (b) will be less common. We can probably leave
> the persisting of the assignment out for now.
> 
> 158. The new name sounds good to me.
> 
> Jun
> 
> On Thu, May 2, 2024 at 10:21 PM Andrew Schofield 
> wrote:
> 
>> Hi Jun,
>> Thanks for the response.
>> 
>> 147. I am trying to get a correspondence between the concepts and
>> metrics of consumer groups and share groups. In both cases,
>> the client doesn’t strictly know when the rebalance starts. All it knows
>> is when it has work to do in order to perform its part of a rebalance.
>> I am proposing that share groups and consumer groups use
>> equivalent logic.
>> 
>> I could remove the rebalance metrics from the client because I do
>> understand that they are making a judgement about when a rebalance
>> starts, but it’s their own part of the rebalance they are measuring.
>> 
>> I tend to think these metrics are better than no metrics and
>> will at least enable administrators to see how much rebalance
>> activity the members of share groups are experiencing.
>> 
>> 150. The simple assignor does not take existing assignments into
>> consideration. The ShareGroupPartitionAssignor interface would
>> permit this, but the simple assignor does not currently use it.
>> 
>> The simple assignor assigns partitions in two ways:
>> a) Distribute the members across the partitions by hashed member ID.
>> b) If any partitions have no members assigned, distribute the members
>> across these partitions round-robin.
>> 
>> The (a) partitions will be quite stable. The (b) partitions will be less
>> stable. By using existing assignment information, it could make (b)
>> partition assignment more stable, whether the assignments are
>> persisted or not. Perhaps it would be worth changing the simple
>> assignor in order to make (b) more stable.
>> 
>> I envisage more sophisticated assignors in the future which could use
>> existing assignments and also other dynamic factors such as lag.
>> 
>> If it transpires that there is significant benefit in persisting
>> assignments
>> specifically to help smooth assignment in the event of GC change,
>> it would be quite an easy enhancement. I am not inclined to persist
>> the assignments in this KIP.
>> 
>> 158. Ah, yes. I see. Of course, I want the names as consistent and
>> understandable too. I suggest renaming
>> ReadShareGroupOffsetsState to ReadShareGroupStateSummary.
>> I haven’t changed the KIP yet, so let me know if that’s OK.
>> 
>> Thanks,
>> Andrew
>> 
>>> On 2 May 2024, at 22:18, Jun Rao  wrote:
>>> 
>>> Hi, Andrew,
>>> 
>>> Thanks for the reply.
>>> 
>>> 147. " it makes a judgement about whether an assignment received is equal
>>> to what it already is using."
>>> If a client receives an assignment different from what it has, it
>> indicates
>>> the end of the rebalance. But how does the client know when the rebalance
>>> starts? In the shareHeartbeat design, the new group epoch is propagated
>>> together with the new assignment in the response.
>>> 
>>> 150. It could be a potential concern if each GC change forces significant
>>> assignment changes. Does the default assignor take existing assignments
>>> into consideration?
>>> 
>>> 155. Good point. Sounds good.
>>> 
>>> 158. My concern with the current naming is that it's not clear what the
>>> difference is between ReadShareGroupOffsetsState and ReadShareGroupState.
>>> The state in the latter is also offsets.
>>> 
>>> Jun
>>> 
>>> On Wed, May 1, 2024 at 9:51 PM Andrew Schofield <
>> andrew_schofi...@live.com>
>>> wrote:
>>> 
 Hi Jun,
 Thanks for your reply.
 
 147. Perhaps the easiest is to take a look at the code in
 o.a.k.clients.consumer.internal.MembershipManagerImpl.
 This class is part of 

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Bruno Cadonna

Hi Matthias,


200:
I like the idea in general. However, it is not clear to me how the 
behavior should be with multiple stream threads in the same Kafka 
Streams client. What stream thread opens which store? How can a stream 
thread pass an open store to another stream thread that got the 
corresponding task assigned? How does a stream thread know that a task 
was not assigned to any of the stream threads of the Kafka Streams 
client? I have the feeling we should just keep the .checkpoint file on 
close for now to unblock this KIP and try to find a solution to get 
totally rid of it later.



Best,
Bruno



On 5/3/24 6:29 PM, Matthias J. Sax wrote:
101: Yes, but what I am saying is, that we don't need to flush the 
.position file to disk periodically, but only maintain it in main 
memory, and only write it to disk on close() to preserve it across 
restarts. This way, it would never be ahead, but might only lag? But 
with my better understanding about (102) it might be mood anyway...



102: Thanks for clarifying. Looked into the code now. Makes sense. Might 
be something to be worth calling out explicitly in the KIP writeup. -- 
Now that I realize that the position is tracked inside the store (not 
outside as the changelog offsets) it makes much more sense to pull 
position into RocksDB itself. In the end, it's actually a "store 
implementation" detail how it tracks the position (and kinda leaky 
abstraction currently, that we re-use the checkpoint file mechanism to 
track it and flush to disk).



200: I was thinking about this a little bit more, and maybe it's not too 
bad? When KS starts up, we could upon all stores we find on local disk 
pro-actively, and keep them all open until the first rebalance finishes: 
For tasks we get assigned, we hand in the already opened store (this 
would amortize the cost to open the store before the rebalance) and for 
non-assigned tasks, we know the offset information won't change and we 
could just cache it in-memory for later reuse (ie, next rebalance) and 
close the store to free up resources? -- Assuming that we would get a 
large percentage of opened stores assigned as tasks anyway, this could 
work?



-Matthias

On 5/3/24 1:29 AM, Bruno Cadonna wrote:

Hi Matthias,


101:
Let's assume a RocksDB store, but I think the following might be true 
also for other store implementations. With this KIP, if Kafka Streams 
commits the offsets, the committed offsets will be stored in an 
in-memory data structure (i.e. the memtable) and stay there until 
RocksDB decides that it is time to persist its in-memory data 
structure. If Kafka Streams writes its position to the .position file 
during a commit and a crash happens before RocksDB persist the 
memtable then the position in the .position file is ahead of the 
persisted offset. If IQ is done between the crash and the state store 
fully restored the changelog, the position might tell IQ that the 
state store is more up-to-date than it actually is.
In contrast, if Kafka Streams handles persisting positions the same as 
persisting offset, the position should always be consistent with the 
offset, because they are persisted together.



102:
I am confused about your confusion which tells me that we are talking 
about two different things.

You asked

"Do you intent to add this information [i.e. position] to the map 
passed via commit(final Map changelogOffsets)?"


and with what I wrote I meant that we do not need to pass the position 
into the implementation of the StateStore interface since the position 
is updated within the implementation of the StateStore interface (e.g. 
RocksDBStore [1]). My statement describes the behavior now, not the 
change proposed in this KIP, so it does not contradict what is stated 
in the KIP.



200:
This is about Matthias' main concern about rebalance metadata.
As far as I understand the KIP, Kafka Streams will only use the 
.checkpoint files to compute the task lag for unassigned tasks whose 
state is locally available. For assigned tasks, it will use the 
offsets managed by the open state store.


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397


On 5/1/24 3:00 AM, Matthias J. Sax wrote:

Thanks Bruno.



101: I think I understand this better now. But just want to make sure 
I do. What do you mean by "they can diverge" and "Recovering after a 
failure might load inconsistent offsets and positions."


The checkpoint is the offset from the changelog, while the position 
is the offset from the upstream source topic, right? -- In the end, 
the position is about IQ, and if we fail to update it, it only means 
that there is some gap when we might not be able to query a standby 
task, because we think it's not up-to-date enough even if it is, 
which would resolve itself soon? Ie, the position might "lag", but 
it's not "inconsistent". Do we believe that this lag 

Re: [DISCUSS] KIP-932: Queues for Kafka

2024-05-03 Thread Jun Rao
Hi, Andrew,

Thanks for the reply.

147. There seems to be some difference between consumer groups and share
groups. In the consumer groups, if a client receives a heartbeat response
to revoke some partitions, it may have to commit offsets before revoking
partitions or it may have to call the rebalance callbacks provided by the
user. This may take some time and can be reflected in the rebalance time
metric. In the share groups, none of that exists. If a client receives some
added/revoked partitions, it accepts them immediately, right? So, does that
practically make the rebalance time always 0?

150. I guess in the common case, there will be many more members than
partitions. So the need for (b) will be less common. We can probably leave
the persisting of the assignment out for now.

158. The new name sounds good to me.

Jun

On Thu, May 2, 2024 at 10:21 PM Andrew Schofield 
wrote:

> Hi Jun,
> Thanks for the response.
>
> 147. I am trying to get a correspondence between the concepts and
> metrics of consumer groups and share groups. In both cases,
> the client doesn’t strictly know when the rebalance starts. All it knows
> is when it has work to do in order to perform its part of a rebalance.
> I am proposing that share groups and consumer groups use
> equivalent logic.
>
> I could remove the rebalance metrics from the client because I do
> understand that they are making a judgement about when a rebalance
> starts, but it’s their own part of the rebalance they are measuring.
>
> I tend to think these metrics are better than no metrics and
> will at least enable administrators to see how much rebalance
> activity the members of share groups are experiencing.
>
> 150. The simple assignor does not take existing assignments into
> consideration. The ShareGroupPartitionAssignor interface would
> permit this, but the simple assignor does not currently use it.
>
> The simple assignor assigns partitions in two ways:
> a) Distribute the members across the partitions by hashed member ID.
> b) If any partitions have no members assigned, distribute the members
> across these partitions round-robin.
>
> The (a) partitions will be quite stable. The (b) partitions will be less
> stable. By using existing assignment information, it could make (b)
> partition assignment more stable, whether the assignments are
> persisted or not. Perhaps it would be worth changing the simple
> assignor in order to make (b) more stable.
>
> I envisage more sophisticated assignors in the future which could use
> existing assignments and also other dynamic factors such as lag.
>
> If it transpires that there is significant benefit in persisting
> assignments
> specifically to help smooth assignment in the event of GC change,
> it would be quite an easy enhancement. I am not inclined to persist
> the assignments in this KIP.
>
> 158. Ah, yes. I see. Of course, I want the names as consistent and
> understandable too. I suggest renaming
> ReadShareGroupOffsetsState to ReadShareGroupStateSummary.
> I haven’t changed the KIP yet, so let me know if that’s OK.
>
> Thanks,
> Andrew
>
> > On 2 May 2024, at 22:18, Jun Rao  wrote:
> >
> > Hi, Andrew,
> >
> > Thanks for the reply.
> >
> > 147. " it makes a judgement about whether an assignment received is equal
> > to what it already is using."
> > If a client receives an assignment different from what it has, it
> indicates
> > the end of the rebalance. But how does the client know when the rebalance
> > starts? In the shareHeartbeat design, the new group epoch is propagated
> > together with the new assignment in the response.
> >
> > 150. It could be a potential concern if each GC change forces significant
> > assignment changes. Does the default assignor take existing assignments
> > into consideration?
> >
> > 155. Good point. Sounds good.
> >
> > 158. My concern with the current naming is that it's not clear what the
> > difference is between ReadShareGroupOffsetsState and ReadShareGroupState.
> > The state in the latter is also offsets.
> >
> > Jun
> >
> > On Wed, May 1, 2024 at 9:51 PM Andrew Schofield <
> andrew_schofi...@live.com>
> > wrote:
> >
> >> Hi Jun,
> >> Thanks for your reply.
> >>
> >> 147. Perhaps the easiest is to take a look at the code in
> >> o.a.k.clients.consumer.internal.MembershipManagerImpl.
> >> This class is part of the new consumer group protocol
> >> code in the client. It makes state transitions based on
> >> the heartbeat requests and responses, and it makes a
> >> judgement about whether an assignment received is
> >> equal to what it already is using. When a state transition
> >> is deemed to be the beginning or end of a rebalance
> >> from the point of view of this client, it counts towards the
> >> rebalance metrics.
> >>
> >> Share groups will follow the same path.
> >>
> >> 150. I do not consider it a concern. Rebalancing a share group
> >> is less disruptive than rebalancing a consumer group. If the assignor
> >> Has information about existing assignments, it 

Re: [DISCUSS] KIP-1027 Add MockFixedKeyProcessorContext

2024-05-03 Thread Matthias J. Sax

Please also update the KIP.

To get a wiki account created, please request it via a commet on this 
ticket: https://issues.apache.org/jira/browse/INFRA-25451


After you have the account, please share your wiki id, and we can give 
you write permission on the wiki.




-Matthias

On 5/3/24 6:30 AM, Shashwat Pandey wrote:

Hi Matthias,

Sorry this fell out of my radar for a bit.

Revisiting the topic, I think you’re right and we accept the duplicated
nesting as an appropriate solution to not affect the larger public API.

I can update my PR with the change.

Regards,
Shashwat Pandey


On Wed, May 1, 2024 at 11:00 PM Matthias J. Sax  wrote:


Any updates on this KIP?

On 3/28/24 4:11 AM, Matthias J. Sax wrote:

It seems that `MockRecordMetadata` is a private class, and thus not part
of the public API. If there are any changes required, we don't need to
discuss on the KIP.


For `CapturedPunctuator` and `CapturedForward` it's a little bit more
tricky. My gut feeling is, that the classes might not need to be
changed, but if we use them within `MockProcessorContext` and
`MockFixedKeyProcessorContext` it might be weird to keep the current
nesting... The problem I see is, that it's not straightforward how to
move the classes w/o breaking compatibility, nor if we duplicate them as
standalone classes w/o a larger "splash radius". (We would need to add
new overloads for MockProcessorContext#scheduledPunctuators() and
MockProcessorContext#forwarded()).

Might be good to hear from others if we think it's worth this larger
changes to get rid of the nesting, or just accept the somewhat not ideal
nesting as it technically is not a real issue?


-Matthias


On 3/15/24 1:47 AM, Shashwat Pandey wrote:

Thanks for the feedback Matthias!

The reason I proposed the extension of MockProcessorContext was more
to do
with the internals of the class (MockRecordMetadata,
CapturedPunctuator and
CapturedForward).

However, I do see your point, I would then think to split
MockProcessorContext and MockFixedKeyProcessorContext, some of the
internal
classes should also be extracted i.e. MockRecordMetadata,
CapturedPunctuator and probably a new CapturedFixedKeyForward.

Let me know what you think!


Regards,
Shashwat Pandey


On Mon, Mar 11, 2024 at 10:09 PM Matthias J. Sax 
wrote:


Thanks for the KIP Shashwat. Closing this testing gap is great! It did
come up a few time already...

One question: why do you propose to `extend MockProcessorContext`?

Given how the actual runtime context classes are setup, it seems that
the regular context and fixed-key-context are distinct, and thus I
believe both mock-context classes should be distinct, too?

What I mean is that FixedKeyProcessorContext does not extend
ProcessorContext. Both classes have a common parent ProcessINGContext
(note the very similar but different names), but they are "siblings"
only, so why make the mock processor a parent-child relationship?

It seems better to do

public class MockFixedKeyProcessorContext
 implements FixedKeyProcessorContext,
RecordCollector.Supplier


Of course, if there is code we can share between both mock-context we
should so this, but it should not leak into the public API?


-Matthias



On 3/11/24 5:21 PM, Shashwat Pandey wrote:

Hi everyone,

I would like to start the discussion on




https://cwiki.apache.org/confluence/display/KAFKA/KIP-1027%3A+Add+MockFixedKeyProcessorContext


This adds MockFixedKeyProcessorContext to the Kafka Streams Test Utils
library.

Regards,
Shashwat Pandey











Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-03 Thread Matthias J. Sax
117f: Good point by Bruno. We should check for this, and could have an 
additional `INVALID_STANDBY_TASK` error code?



-Matthias

On 5/3/24 5:52 AM, Guozhang Wang wrote:

Hi Sophie,

Re: As for the return type of the TaskAssignmentUtils, I think that
makes sense. LGTM.

On Fri, May 3, 2024 at 2:26 AM Bruno Cadonna  wrote:


Hi Sophie,

117f:
I think, removing the STATEFUL and STATELESS types is not enough to
avoid the error Guozhang mentioned. The StreamsPartitionAssignor passes
the information whether a task is stateless or stateful into the task
assignor. However, the task assignor can return a standby task for a
stateless task which is inconsistent.

Echoing Matthias' statement about the missing UNKNOWN_TASK_ID error.

nit:
The titles of some code blocks in the KIP are not consistent with their
content, e.g., KafkaStreamsState <-> NodeState


Best,
Bruno

On 5/3/24 2:43 AM, Matthias J. Sax wrote:

Thanks Sophie. My bad. You are of course right about `TaskAssignment`
and the StreamsPartitionAssignor's responsibitliy to map tasks of a
instance to consumers. When I wrote my reply, I forgot about this detail.

Seems you did not add `UNKNOWN_TASK_ID` error yet as proposed by Guozhang?

Otherwise LGTM.


-Matthias

On 5/2/24 4:20 PM, Sophie Blee-Goldman wrote:

Guozhang:

117. All three additions make sense to me. However, while thinking about
how users would actually produce an assignment, I realized that it seems
silly to make it their responsibility to distinguish between a stateless
and stateful task when they return the assignment. The
StreamsPartitionAssignor already knows which tasks are stateful vs
stateless, so there's no need to add this extra step for users to
figure it
out themselves, and potentially make a mistake.

117f: So, rather than add a new error type for "inconsistent task types",
I'm proposing to just flatten the AssignedTask.Type enum to only "ACTIVE"
and "STANDBY", and remove the "STATEFUL" and "STATELESS" types
altogether.
Any objections?

-

-Thanks, fixed the indentation of headers under "User APIs" and
"Read-Only
APIs"

-As for the return type of the TaskAssignmentUtils methods, I don't
personally feel too strongly about this, but the reason for the return
type
being a Map rather than a
TaskAssignment
is because they are meant to be used iteratively/to create a part of the
full assignment, and not necessarily a full assignment for each. Notice
that they all have an input parameter of the same type: Map. The idea is you can take the output of any of
these and pass it in to another to generate or optimize another piece of
the overall assignment. For example, if you want to perform the
rack-aware
optimization on both active and standby tasks, you would need to call
#optimizeRackAwareActiveTasks and then forward the output to
#optimizeRackAwareStandbyTasks to get the final assignment. If we
return a
TaskAssignment, it will usually need to be unwrapped right away. Perhaps
more importantly, I worry that returning a TaskAssignment will make it
seem
like each of these utility methods return a "full" and final assignment
that can just be returned as-is from the TaskAssignor's #assign method.
Whereas they are each just a single step in the full assignment process,
and not the final product. Does that make sense?

On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman

wrote:


Matthias:

Thanks for the naming suggestions for the error codes. I was
definitely not happy with my original naming but couldn't think of
anything
better.  I like your proposals though, will update the KIP names.
I'll also
add a "NONE" option as well -- much better than just passing in null
for no
error.


OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
same active task


   Would also be an error if assigned to two consumers of the same
client...

Needs to be rephrased.



Well the TaskAssignor only assigns tasks to KafkaStreams clients,
it's not
responsible for the assignment of tasks to consumers within a
KafkaStreams.
It would be a bug in the StreamsPartitionAssignor if it received a valid
assignment from the TaskAssignor with only one copy of a task
assigned to a
single KAfkaStreams client, and then somehow ended up assigning that
task
to multiple consumers on the KafkaStreams client. It wouldn't be the
TaskAssignor's fault so imo it would not make sense to include this
case in
the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_
ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug that
caused
the StreamsPartitionAssignor to assign a task to multiple consumers, it
presumably wouldn't even notice since it's a bug -- if it did notice, it
can just fix the issue. The error codes are about communicating
unfixable
issues due to the TaskAssignor itself returning an invalid
assignment. The
phrasing is intentional, and (imo) correct as it is.

I do see your point about how the StreamsPartitionAssignor should
handle/react to invalid assignments though. I'm fine with just

Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Matthias J. Sax
101: Yes, but what I am saying is, that we don't need to flush the 
.position file to disk periodically, but only maintain it in main 
memory, and only write it to disk on close() to preserve it across 
restarts. This way, it would never be ahead, but might only lag? But 
with my better understanding about (102) it might be mood anyway...



102: Thanks for clarifying. Looked into the code now. Makes sense. Might 
be something to be worth calling out explicitly in the KIP writeup. -- 
Now that I realize that the position is tracked inside the store (not 
outside as the changelog offsets) it makes much more sense to pull 
position into RocksDB itself. In the end, it's actually a "store 
implementation" detail how it tracks the position (and kinda leaky 
abstraction currently, that we re-use the checkpoint file mechanism to 
track it and flush to disk).



200: I was thinking about this a little bit more, and maybe it's not too 
bad? When KS starts up, we could upon all stores we find on local disk 
pro-actively, and keep them all open until the first rebalance finishes: 
For tasks we get assigned, we hand in the already opened store (this 
would amortize the cost to open the store before the rebalance) and for 
non-assigned tasks, we know the offset information won't change and we 
could just cache it in-memory for later reuse (ie, next rebalance) and 
close the store to free up resources? -- Assuming that we would get a 
large percentage of opened stores assigned as tasks anyway, this could work?



-Matthias

On 5/3/24 1:29 AM, Bruno Cadonna wrote:

Hi Matthias,


101:
Let's assume a RocksDB store, but I think the following might be true 
also for other store implementations. With this KIP, if Kafka Streams 
commits the offsets, the committed offsets will be stored in an 
in-memory data structure (i.e. the memtable) and stay there until 
RocksDB decides that it is time to persist its in-memory data structure. 
If Kafka Streams writes its position to the .position file during a 
commit and a crash happens before RocksDB persist the memtable then the 
position in the .position file is ahead of the persisted offset. If IQ 
is done between the crash and the state store fully restored the 
changelog, the position might tell IQ that the state store is more 
up-to-date than it actually is.
In contrast, if Kafka Streams handles persisting positions the same as 
persisting offset, the position should always be consistent with the 
offset, because they are persisted together.



102:
I am confused about your confusion which tells me that we are talking 
about two different things.

You asked

"Do you intent to add this information [i.e. position] to the map passed 
via commit(final Map changelogOffsets)?"


and with what I wrote I meant that we do not need to pass the position 
into the implementation of the StateStore interface since the position 
is updated within the implementation of the StateStore interface (e.g. 
RocksDBStore [1]). My statement describes the behavior now, not the 
change proposed in this KIP, so it does not contradict what is stated in 
the KIP.



200:
This is about Matthias' main concern about rebalance metadata.
As far as I understand the KIP, Kafka Streams will only use the 
.checkpoint files to compute the task lag for unassigned tasks whose 
state is locally available. For assigned tasks, it will use the offsets 
managed by the open state store.


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397


On 5/1/24 3:00 AM, Matthias J. Sax wrote:

Thanks Bruno.



101: I think I understand this better now. But just want to make sure 
I do. What do you mean by "they can diverge" and "Recovering after a 
failure might load inconsistent offsets and positions."


The checkpoint is the offset from the changelog, while the position is 
the offset from the upstream source topic, right? -- In the end, the 
position is about IQ, and if we fail to update it, it only means that 
there is some gap when we might not be able to query a standby task, 
because we think it's not up-to-date enough even if it is, which would 
resolve itself soon? Ie, the position might "lag", but it's not 
"inconsistent". Do we believe that this lag would be highly problematic?




102: I am confused.

The position is maintained inside the state store, but is persisted 
in the .position file when the state store closes. 


This contradicts the KIP:

 these position offsets will be stored in RocksDB, in the same column 
family as the changelog offsets, instead of the .position file




My main concern is currently about rebalance metadata -- opening 
RocksDB stores seems to be very expensive, but if we follow the KIP:


We will do this under EOS by updating the .checkpoint file whenever a 
store is close()d. 


It seems, having the offset inside RocksDB does not help us at all? In 
the end, when we 

Re: [DISCUSS] KIP-1041: Drop `offsets.commit.required.acks` config in 4.0 (deprecate in 3.8)

2024-05-03 Thread Federico Valeri
Hi David, I can't think about any valid use case where changing the
default value would be useful, and your motivation is convincing.
Thanks.

On Fri, May 3, 2024 at 6:01 AM Andrew Schofield
 wrote:
>
> Hi David,
> I think this KIP is a very good idea. It would be good to get rid of this 
> cruft.
>
> Thanks,
> Andrew
>
> > On 2 May 2024, at 18:54, David Jacot  wrote:
> >
> > Hi folks,
> >
> > I have put together a very small KIP to
> > deprecate offsets.commit.required.acks in 3.8 and remove it in 4.0. See the
> > motivation for the reason.
> >
> > KIP: https://cwiki.apache.org/confluence/x/9YobEg
> >
> > Please let me know what you think.
> >
> > Best,
> > David
>


Re: [VOTE] KIP-1036: Extend RecordDeserializationException exception

2024-05-03 Thread Matthias J. Sax

+1 (binding)

On 5/3/24 8:52 AM, Federico Valeri wrote:

Hi Fred, this is a useful addition.

+1 non binding

Thanks

On Fri, May 3, 2024 at 4:11 PM Andrew Schofield
 wrote:


Hi Fred,
Thanks for the KIP. It’s turned out nice and elegant I think. Definitely a 
worthwhile improvement.

+1 (non-binding)

Thanks,
Andrew


On 30 Apr 2024, at 14:02, Frédérik Rouleau  
wrote:

Hi all,

As there is no more activity for a while on the discuss thread, I think we
can start a vote.
The KIP is available on
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception


If you have some feedback or suggestions, please participate to the
discussion thread:
https://lists.apache.org/thread/or85okygtfywvnsfd37kwykkq5jq7fy5

Best regards,
Fred




Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-03 Thread Matthias J. Sax
What about (106) to unify both exiting callback methods of 
`ProductionExceptionHandler` into a single one, instead of adding two 
new ones?


Damien's last reply about it was:


I will think about unifying, I do agree it would be cleaner.


There was not follow up on this question, and the KIP right now still 
proposes to add two new methods, which I believe we could (should?) 
unify to:


default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, 
  final ProducerRecord record,

  final Exception exception) {


Ie, we drop the generics `` on `ProducerRecord` what 
allows you to also pass in a non-serialized ProducerRecord of any type 
for the serialization error case.


Btw: wondering if we also want to pass in a flag/enum about key vs value 
serialization error similar to what was proposed in KIP-1036? The only 
"oddity" would be, that we call the handler other error cases, too, not 
just for serialization exceptions. But we wculd tackle this by 
introducing a new class `RecordSerializationException` which would 
include the flag and would ensure that KS hands this exception into the 
handler. This would keep the handler interface/method itself clean.



-Matthias




On 5/3/24 2:15 AM, Loic Greffier wrote:

Hi Bruno,

Good catch, KIP has been updated accordingly.

Regards,
Loïc


Re: [VOTE] KIP-1036: Extend RecordDeserializationException exception

2024-05-03 Thread Federico Valeri
Hi Fred, this is a useful addition.

+1 non binding

Thanks

On Fri, May 3, 2024 at 4:11 PM Andrew Schofield
 wrote:
>
> Hi Fred,
> Thanks for the KIP. It’s turned out nice and elegant I think. Definitely a 
> worthwhile improvement.
>
> +1 (non-binding)
>
> Thanks,
> Andrew
>
> > On 30 Apr 2024, at 14:02, Frédérik Rouleau  
> > wrote:
> >
> > Hi all,
> >
> > As there is no more activity for a while on the discuss thread, I think we
> > can start a vote.
> > The KIP is available on
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception
> >
> >
> > If you have some feedback or suggestions, please participate to the
> > discussion thread:
> > https://lists.apache.org/thread/or85okygtfywvnsfd37kwykkq5jq7fy5
> >
> > Best regards,
> > Fred
>


[jira] [Created] (KAFKA-16665) Fail to get partition's position from within onPartitionsAssigned callback in new consumer

2024-05-03 Thread Lianet Magrans (Jira)
Lianet Magrans created KAFKA-16665:
--

 Summary: Fail to get partition's position from within 
onPartitionsAssigned callback in new consumer 
 Key: KAFKA-16665
 URL: https://issues.apache.org/jira/browse/KAFKA-16665
 Project: Kafka
  Issue Type: Task
  Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Lianet Magrans
Assignee: Lianet Magrans
 Fix For: 3.8.0


If we attempt to call consumer.position(tp) from within the 
onPartitionsAssigned callback, the new consumer fails with a TimeoutException. 
The expectation is that we should be able to retrieve the position of newly 
assigned partitions, as it happens with the legacy consumer, that allows this 
call. This is actually used from places within Kafka itself (ex. Connect 
[WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
 

The failure in the new consumer is because the partitions that are assigned but 
awaiting the onPartitionsAssigned callback, are excluded from the list of 
partitions that should initialize. We should allow the partitions to initialize 
their positions, without allowing to fetch data from them (which is already 
achieve based on the isFetchable flag in the subscription state).

  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1038: Add Custom Error Handler to Producer

2024-05-03 Thread Alieh Saeedi
Hi all,


A summary of the KIP and the discussions:


The KIP introduces a handler interface for Producer in order to handle two
exceptions: RecordTooLargeException and UnknownTopicOrPartitionException.
The handler handles the exceptions per-record.


- Do we need this handler?  [Motivation and Examples sections]


RecordTooLargeException: 1) In transactions, the producer collects multiple
records in batches. Then a RecordTooLargeException related to a single
record leads to failing the entire batch. A custom exception handler in
this case may decide on dropping the record and continuing the processing.
See Example 1, please. 2) More over, in Kafka Streams, a record that is too
large is a poison pill record, and there is no way to skip over it. A
handler would allow us to react to this error inside the producer, i.e.,
local to where the error happens, and thus simplify the overall code
significantly. Please read the Motivation section for more explanation.


UnknownTopicOrPartitionException: For this case, the producer handles this
exception internally and only issues a WARN log about missing metadata and
retries internally. Later, when the producer hits "deliver.timeout.ms"  it
throws a TimeoutException, and the user can only blindly retry, which may
result in an infinite retry loop. The thrown TimeoutException "cuts" the
connection to the underlying root cause of missing metadata (which could
indeed be a transient error but is persistent for a non-existing topic).
Thus, there is no programmatic way to break the infinite retry loop. Kafka
Streams also blindly retries for this case, and the application gets stuck.



- Having interface vs configuration option: [Motivation, Examples, and
Rejected Alternatives sections]

Our solution is introducing an interface due to the full flexibility that
it offers. Sometimes users, especially Kafka Streams ones, determine the
handler's behaviour based on the situation. For example, f
acing UnknownTopicOrPartitionException*, *the user may want to raise an
error for some topics but retry it for other topics. Having a configuration
option with a fixed set of possibilities does not serve the user's
needs. See Example 2, please.


- Note on RecordTooLargeException: [Public Interfaces section]

If the custom handler decides on SWALLOW for RecordTooLargeException, then
this record will not be a part of the batch of transactions and will also
not be sent to the broker in non-transactional mode. So no worries about
getting a RecordTooLargeException from the broker in this case, as the
record will never ever be sent to the broker. SWALLOW means drop the record
and continue/swallow the error.


- What if the handle() method implements RETRY for RecordTooLargeException?
[Proposed Changes section]

We have to limit the user to only have FAIL or SWALLOW for
RecordTooLargeException. Actually, RETRY must be equal to FAIL. This is
well documented/informed in javadoc.



- What if the handle() method of the handler throws an exception?

The handler is expected to have correct code. If it throws an exception,
everything fails.



This is a PoC PR  ONLY for
RecordTooLargeException. The code changes related to
UnknownTopicOrPartitionException will be added to this PR LATER.


Looking forward to your feedback again.


Cheers,

Alieh

On Thu, Apr 25, 2024 at 11:46 PM Kirk True  wrote:

> Hi Alieh,
>
> Thanks for the updates!
>
> Comments inline...
>
>
> > On Apr 25, 2024, at 1:10 PM, Alieh Saeedi 
> wrote:
> >
> > Hi all,
> >
> > Thanks a lot for the constructive feedbacks!
> >
> >
> >
> > Addressing some of the main concerns:
> >
> >
> > - The `RecordTooLargeException` can be thrown by broker, producer and
> > consumer. Of course, the `ProducerExceptionHandler` interface is
> introduced
> > to affect only the exceptions thrown from the producer. This KIP very
> > specifically means to provide a possibility to manage the
> > `RecordTooLargeException` thrown from the Producer.send() method. Please
> > see “Proposed Changes” section for more clarity. I investigated the issue
> > there thoroughly. I hope it can explain the concern about how we handle
> the
> > errors as well.
> >
> >
> >
> > - The problem with Callback: Methods of Callback are called when the
> record
> > sent to the server is acknowledged, while this is not the desired time
> for
> > all exceptions. We intend to handle exceptions beforehand.
>
> I guess it makes sense to keep the expectation for when Callback is
> invoked as-is vs. shoehorning more into it.
>
> > - What if the custom handler returns RETRY for
> `RecordTooLargeException`? I
> > assume changing the producer configuration at runtime is possible. If so,
> > RETRY for a too large record is valid because maybe in the next try, the
> > too large record is not poisoning any more. I am not 100% sure about the
> > technical details, though. Otherwise, we can consider the RETRY as FAIL
> for
> > this exception. Another 

[jira] [Created] (KAFKA-16664) Re-add EventAccumulator.take(timeout)

2024-05-03 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-16664:


 Summary: Re-add EventAccumulator.take(timeout)
 Key: KAFKA-16664
 URL: https://issues.apache.org/jira/browse/KAFKA-16664
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jeff Kim


[https://github.com/apache/kafka/pull/15835] should be used with a timeout in 
EventAccumulator#take. We added a commit to remove the timeout, we should 
revert it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-05-03 Thread Justine Olshan
Hey folks,

I shared this with Omnia offline:
One concern I have is with the length of time we keep "seen" producer IDs.
It seems like the default is 1 hour. If a producer produces every 2 hours
or so, and we are hitting the limit, it seems like we will throttle it even
though we've seen it before and have state for it on the server. Then, it
seems like we will have to wait for the natural expiration of producer ids
(via producer.id.expiration.ms) before we allow new or idle producers to
join again without throttling. I think this proposal is a step in the right
direction when it comes to throttling the "right" clients, but I want to
make sure we have reasonable defaults. Keep in mind that idempotent
producers are the default, so most folks won't be tuning these values out
of the box.

As for Igor's questions about InitProducerId -- I think the main reason we
have avoided that solution is that there is no state stored for idempotent
producers when grabbing an ID. My concern there is either storing too much
state to track this or throttling before we need to.

Justine

On Thu, May 2, 2024 at 2:36 PM Claude Warren, Jr
 wrote:

> There is some question about whether or not we need the configuration
> options.  My take on them is as follows:
>
> producer.id.quota.window.num  No opinion.  I don't know what this is used
> for, but I suspect that there is a good reason to have it.  It is not used
> within the Bloom filter caching mechanism
> producer.id.quota.window.size.seconds Leave it as it is one of the most
> effective ways to tune the filter and determines how long a PID is
> recognized.
> producer.id.quota.cache.cleanup.scheduler.interval.ms  Remove it unless
> there is another use for it.   We can get a better calculation for
> internals.
> producer.id.quota.cache.layer.count Leave it as it is one of the most
> effective ways to tune the filter.
> producer.id.quota.cache.false.positive.rate Replace it with a constant,  I
> don't think any other Bloom filter solution provides access to this knob
> for end users.
> producer_ids_rate Leave this one, it is critical for reasonable operation.
>


Re: [VOTE] KIP-1036: Extend RecordDeserializationException exception

2024-05-03 Thread Andrew Schofield
Hi Fred,
Thanks for the KIP. It’s turned out nice and elegant I think. Definitely a 
worthwhile improvement.

+1 (non-binding)

Thanks,
Andrew

> On 30 Apr 2024, at 14:02, Frédérik Rouleau  
> wrote:
>
> Hi all,
>
> As there is no more activity for a while on the discuss thread, I think we
> can start a vote.
> The KIP is available on
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1036%3A+Extend+RecordDeserializationException+exception
>
>
> If you have some feedback or suggestions, please participate to the
> discussion thread:
> https://lists.apache.org/thread/or85okygtfywvnsfd37kwykkq5jq7fy5
>
> Best regards,
> Fred



Re: [DISCUSS] Apache Kafka 3.8.0 release

2024-05-03 Thread Josep Prat
Hi Kafka developers!
I just wanted to remind you all of the upcoming relevant dates for Kafka
3.8.0:
- KIP freeze is on May 15th (this is in a little less than 2 weeks)
- Feature freeze is on May 29th (this is in a little more than 25 days).

If there is a KIP you really want to have in the 3.8 series, now is the
time to make the last push. Once the deadline for KIP freeze is over I'll
update the release plan with the final list of KIPs accepted and that may
make it to the release.

Best!

On Wed, Mar 6, 2024 at 10:40 AM Josep Prat  wrote:

> Hi all,
>
> Thanks for your support. I updated the skeleton release plan created by
> Colin. You can find it here:
> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+3.8.0
>
> Our last release stumbled upon some problems while releasing and was
> delayed by several weeks, so I won't try to shave some weeks from our plan
> for 3.8.0 (we might end up having delays again). Please raise your concerns
> if you don't agree with the proposed dates.
>
> The current proposal on dates are:
>
>- KIP Freeze: *15nd May *(Wednesday)
>   - A KIP must be accepted by this date in order to be considered for
>   this release. Note, any KIP that may not be implemented in a week, or 
> that
>   might destabilize the release, should be deferred.
>- Feature Freeze: *29th May *(Wednesday)
>   - *major features merged & working on stabilisation, minor features
>   have PR, release branch cut; anything not in this state will be
>   automatically moved to the next release in JIRA*
>- Code Freeze: *12th June *(Wednesday)
>- At least two weeks of stabilization will follow Code Freeze, meaning
>we expect to release *no earlier* than *June 26th*. We will move as
>fast as we can, and aim for completion the earliest we can in June.
>
> I went through the KIP list, and found that these are the ones that might
> make it into the release:
> KIP-853: KRaft Controller Membership Changes (still under discussion)
> KIP-942: Add Power(ppc64le) support
> KIP-966: Eligible Leader Replicas
> KIP-974: Docker Image for GraalVM based Native Kafka Broker
> KIP-977: Partition-Level Throughput Metrics
> KIP-993: Allow restricting files accessed by File and Directory
> ConfigProviders
> KIP-994: Minor Enhancements to ListTransactions and DescribeTransactions
> APIs
> KIP-996: Pre-Vote
> KIP-1004: Enforce tasks.max property in Kafka Connect
> KIP-1005: Expose EarliestLocalOffset and TieredOffset
> KIP-1007: Introduce Remote Storage Not Ready Exception
> KIP-1019: Expose method to determine Metric Measurability
>
> Please review the plan and provide any additional information or updates
> regarding KIPs that target this release version (3.8).
> If you have authored any KIPs that have an inaccurate status in the list,
> or are not in the list and should be, or are in the list and should not be
> - please share it in this thread so that I can keep the document accurate
> and up to date.
>
> Looking forward to your feedback.
>
> Best,
>
> On Wed, Feb 28, 2024 at 10:07 AM Satish Duggana 
> wrote:
>
>> Thanks Josep, +1.
>>
>> On Tue, 27 Feb 2024 at 17:29, Divij Vaidya 
>> wrote:
>> >
>> > Thank you for volunteering Josep. +1 from me.
>> >
>> > --
>> > Divij Vaidya
>> >
>> >
>> >
>> > On Tue, Feb 27, 2024 at 9:35 AM Bruno Cadonna 
>> wrote:
>> >
>> > > Thanks Josep!
>> > >
>> > > +1
>> > >
>> > > Best,
>> > > Bruno
>> > >
>> > > On 2/26/24 9:53 PM, Chris Egerton wrote:
>> > > > Thanks Josep, I'm +1 as well.
>> > > >
>> > > > On Mon, Feb 26, 2024 at 12:32 PM Justine Olshan
>> > > >  wrote:
>> > > >
>> > > >> Thanks Joesp. +1 from me.
>> > > >>
>> > > >> On Mon, Feb 26, 2024 at 3:37 AM Josep Prat
>> > > > >
>> > > >> wrote:
>> > > >>
>> > > >>> Hi all,
>> > > >>>
>> > > >>> I'd like to volunteer as release manager for the Apache Kafka
>> 3.8.0
>> > > >>> release.
>> > > >>> If there are no objections, I'll start building a release plan (or
>> > > >> adapting
>> > > >>> the one Colin made some weeks ago) in the wiki in the next days.
>> > > >>>
>> > > >>> Thank you.
>> > > >>>
>> > > >>> --
>> > > >>> [image: Aiven] 
>> > > >>>
>> > > >>> *Josep Prat*
>> > > >>> Open Source Engineering Director, *Aiven*
>> > > >>> josep.p...@aiven.io   |   +491715557497
>> > > >>> aiven.io    |   <
>> > > >> https://www.facebook.com/aivencloud
>> > > 
>> > > >>>   <
>> > > >>> https://twitter.com/aiven_io>
>> > > >>> *Aiven Deutschland GmbH*
>> > > >>> Alexanderufer 3-7, 10117 Berlin
>> > > >>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
>> > > >>> Amtsgericht Charlottenburg, HRB 209739 B
>> > > >>>
>> > > >>
>> > > >
>> > >
>>
>
>
> --
> [image: Aiven] 
>
> *Josep Prat*
> Open Source Engineering Director, *Aiven*
> josep.p...@aiven.io   |   +491715557497
> aiven.io    |
> 
> 

[jira] [Created] (KAFKA-16663) CoordinatorRuntime write timer tasks should be cancelled once HWM advances

2024-05-03 Thread Jeff Kim (Jira)
Jeff Kim created KAFKA-16663:


 Summary: CoordinatorRuntime write timer tasks should be cancelled 
once HWM advances
 Key: KAFKA-16663
 URL: https://issues.apache.org/jira/browse/KAFKA-16663
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jeff Kim
Assignee: Jeff Kim


Otherwise, we pile up the number of timer tasks



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1027 Add MockFixedKeyProcessorContext

2024-05-03 Thread Shashwat Pandey
Hi Matthias,

Sorry this fell out of my radar for a bit.

Revisiting the topic, I think you’re right and we accept the duplicated
nesting as an appropriate solution to not affect the larger public API.

I can update my PR with the change.

Regards,
Shashwat Pandey


On Wed, May 1, 2024 at 11:00 PM Matthias J. Sax  wrote:

> Any updates on this KIP?
>
> On 3/28/24 4:11 AM, Matthias J. Sax wrote:
> > It seems that `MockRecordMetadata` is a private class, and thus not part
> > of the public API. If there are any changes required, we don't need to
> > discuss on the KIP.
> >
> >
> > For `CapturedPunctuator` and `CapturedForward` it's a little bit more
> > tricky. My gut feeling is, that the classes might not need to be
> > changed, but if we use them within `MockProcessorContext` and
> > `MockFixedKeyProcessorContext` it might be weird to keep the current
> > nesting... The problem I see is, that it's not straightforward how to
> > move the classes w/o breaking compatibility, nor if we duplicate them as
> > standalone classes w/o a larger "splash radius". (We would need to add
> > new overloads for MockProcessorContext#scheduledPunctuators() and
> > MockProcessorContext#forwarded()).
> >
> > Might be good to hear from others if we think it's worth this larger
> > changes to get rid of the nesting, or just accept the somewhat not ideal
> > nesting as it technically is not a real issue?
> >
> >
> > -Matthias
> >
> >
> > On 3/15/24 1:47 AM, Shashwat Pandey wrote:
> >> Thanks for the feedback Matthias!
> >>
> >> The reason I proposed the extension of MockProcessorContext was more
> >> to do
> >> with the internals of the class (MockRecordMetadata,
> >> CapturedPunctuator and
> >> CapturedForward).
> >>
> >> However, I do see your point, I would then think to split
> >> MockProcessorContext and MockFixedKeyProcessorContext, some of the
> >> internal
> >> classes should also be extracted i.e. MockRecordMetadata,
> >> CapturedPunctuator and probably a new CapturedFixedKeyForward.
> >>
> >> Let me know what you think!
> >>
> >>
> >> Regards,
> >> Shashwat Pandey
> >>
> >>
> >> On Mon, Mar 11, 2024 at 10:09 PM Matthias J. Sax 
> >> wrote:
> >>
> >>> Thanks for the KIP Shashwat. Closing this testing gap is great! It did
> >>> come up a few time already...
> >>>
> >>> One question: why do you propose to `extend MockProcessorContext`?
> >>>
> >>> Given how the actual runtime context classes are setup, it seems that
> >>> the regular context and fixed-key-context are distinct, and thus I
> >>> believe both mock-context classes should be distinct, too?
> >>>
> >>> What I mean is that FixedKeyProcessorContext does not extend
> >>> ProcessorContext. Both classes have a common parent ProcessINGContext
> >>> (note the very similar but different names), but they are "siblings"
> >>> only, so why make the mock processor a parent-child relationship?
> >>>
> >>> It seems better to do
> >>>
> >>> public class MockFixedKeyProcessorContext
> >>> implements FixedKeyProcessorContext,
> >>>RecordCollector.Supplier
> >>>
> >>>
> >>> Of course, if there is code we can share between both mock-context we
> >>> should so this, but it should not leak into the public API?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>>
> >>> On 3/11/24 5:21 PM, Shashwat Pandey wrote:
>  Hi everyone,
> 
>  I would like to start the discussion on
> 
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1027%3A+Add+MockFixedKeyProcessorContext
> 
>  This adds MockFixedKeyProcessorContext to the Kafka Streams Test Utils
>  library.
> 
>  Regards,
>  Shashwat Pandey
> 
> >>>
> >>
>


Re: [DISCUSS] KIP-1036: Extend RecordDeserializationException exception

2024-05-03 Thread Frédérik Rouleau
Hi Sophie,

I have updated the KIP and the PR.

Regards,


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-03 Thread Guozhang Wang
Hi Sophie,

Re: As for the return type of the TaskAssignmentUtils, I think that
makes sense. LGTM.

On Fri, May 3, 2024 at 2:26 AM Bruno Cadonna  wrote:
>
> Hi Sophie,
>
> 117f:
> I think, removing the STATEFUL and STATELESS types is not enough to
> avoid the error Guozhang mentioned. The StreamsPartitionAssignor passes
> the information whether a task is stateless or stateful into the task
> assignor. However, the task assignor can return a standby task for a
> stateless task which is inconsistent.
>
> Echoing Matthias' statement about the missing UNKNOWN_TASK_ID error.
>
> nit:
> The titles of some code blocks in the KIP are not consistent with their
> content, e.g., KafkaStreamsState <-> NodeState
>
>
> Best,
> Bruno
>
> On 5/3/24 2:43 AM, Matthias J. Sax wrote:
> > Thanks Sophie. My bad. You are of course right about `TaskAssignment`
> > and the StreamsPartitionAssignor's responsibitliy to map tasks of a
> > instance to consumers. When I wrote my reply, I forgot about this detail.
> >
> > Seems you did not add `UNKNOWN_TASK_ID` error yet as proposed by Guozhang?
> >
> > Otherwise LGTM.
> >
> >
> > -Matthias
> >
> > On 5/2/24 4:20 PM, Sophie Blee-Goldman wrote:
> >> Guozhang:
> >>
> >> 117. All three additions make sense to me. However, while thinking about
> >> how users would actually produce an assignment, I realized that it seems
> >> silly to make it their responsibility to distinguish between a stateless
> >> and stateful task when they return the assignment. The
> >> StreamsPartitionAssignor already knows which tasks are stateful vs
> >> stateless, so there's no need to add this extra step for users to
> >> figure it
> >> out themselves, and potentially make a mistake.
> >>
> >> 117f: So, rather than add a new error type for "inconsistent task types",
> >> I'm proposing to just flatten the AssignedTask.Type enum to only "ACTIVE"
> >> and "STANDBY", and remove the "STATEFUL" and "STATELESS" types
> >> altogether.
> >> Any objections?
> >>
> >> -
> >>
> >> -Thanks, fixed the indentation of headers under "User APIs" and
> >> "Read-Only
> >> APIs"
> >>
> >> -As for the return type of the TaskAssignmentUtils methods, I don't
> >> personally feel too strongly about this, but the reason for the return
> >> type
> >> being a Map rather than a
> >> TaskAssignment
> >> is because they are meant to be used iteratively/to create a part of the
> >> full assignment, and not necessarily a full assignment for each. Notice
> >> that they all have an input parameter of the same type: Map >> KafkaStreamsAssignment>. The idea is you can take the output of any of
> >> these and pass it in to another to generate or optimize another piece of
> >> the overall assignment. For example, if you want to perform the
> >> rack-aware
> >> optimization on both active and standby tasks, you would need to call
> >> #optimizeRackAwareActiveTasks and then forward the output to
> >> #optimizeRackAwareStandbyTasks to get the final assignment. If we
> >> return a
> >> TaskAssignment, it will usually need to be unwrapped right away. Perhaps
> >> more importantly, I worry that returning a TaskAssignment will make it
> >> seem
> >> like each of these utility methods return a "full" and final assignment
> >> that can just be returned as-is from the TaskAssignor's #assign method.
> >> Whereas they are each just a single step in the full assignment process,
> >> and not the final product. Does that make sense?
> >>
> >> On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman
> >> 
> >> wrote:
> >>
> >>> Matthias:
> >>>
> >>> Thanks for the naming suggestions for the error codes. I was
> >>> definitely not happy with my original naming but couldn't think of
> >>> anything
> >>> better.  I like your proposals though, will update the KIP names.
> >>> I'll also
> >>> add a "NONE" option as well -- much better than just passing in null
> >>> for no
> >>> error.
> >>>
>  OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
>  same active task
> >>>
> >>>   Would also be an error if assigned to two consumers of the same
> >>> client...
>  Needs to be rephrased.
> >>>
> >>>
> >>> Well the TaskAssignor only assigns tasks to KafkaStreams clients,
> >>> it's not
> >>> responsible for the assignment of tasks to consumers within a
> >>> KafkaStreams.
> >>> It would be a bug in the StreamsPartitionAssignor if it received a valid
> >>> assignment from the TaskAssignor with only one copy of a task
> >>> assigned to a
> >>> single KAfkaStreams client, and then somehow ended up assigning that
> >>> task
> >>> to multiple consumers on the KafkaStreams client. It wouldn't be the
> >>> TaskAssignor's fault so imo it would not make sense to include this
> >>> case in
> >>> the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_
> >>> ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug that
> >>> caused
> >>> the StreamsPartitionAssignor to assign a task to multiple consumers, it
> >>> presumably wouldn't even notice 

Re: [DISCUSS] KIP-1042 support for wildcard when creating new acls

2024-05-03 Thread Haruki Okada
Hi, Murali.

First, could you add the KIP-1042 to the index (
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals)
as well so that everyone can find it easily?

I took a look at the KIP, then I have 2 questions:

1. Though the new MATCH resource pattern type may reduce the effort of
adding ACLs in some cases, do you have any concrete use case you are in
mind? (When prefixed ACL was introduced in KIP-290, there was a use-case
that using it for implementing multi-tenancy)

2. As you may know, ACL lookup is in the hot-path which the performance is
very important. (
https://github.com/apache/kafka/blob/240243b91d69c2b65b5e456065fdcce90c121b04/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala#L539).
Do you have ideas how do we update `matchingAcls` to support MATCH-type ACL
without introducing performance issue?


Thanks,

2024年5月3日(金) 19:51 Claude Warren, Jr :

> As I wrote in [1], the ACL evaluation algorithm needs to be specified with
> respect to the specificity of the pattern so that we know exactly which if
> *-accounts-* takes precedence over nl-accounts-* or visa versa.
>
> I think that we should spell out that precedence is evaluated as follows:
>
> 1. Remove patterns that do not match
> 2. More specific patterns take precedence over less specific patterns
> 3. for patterns of the same precedence DENY overrides ALLOW
>
> Determining specificity:
>
> Specificity is based on the Levenshtein distance between the pattern and
> the text being evaluated. The lower the distance the more specific the
> rule.
> Using the topic name: nl-accounts-localtopic we can evaluate the
> Levenshtein distance for various patterns.
> nl-accounts-localtopic = 0
> *-accounts-localtopic = 2
> nl-accounts-local* = 5
> *-accounts-local* = 7
> nl-accounts-* = 10
> *-accounts-* = 12
>
> In the special case of matching principles User matches are more specific
> than Group matches.
>
> I don't know if this should be added to KIP-1042 or presented as a new KIP.
>
> Claude
>
> [1] https://lists.apache.org/thread/0l88tkbxq3ol9rnx0ljnmswj5y6pho1f
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1042%3A+Support+for+wildcard+when+creating+new+acls
> >
>
> On Fri, May 3, 2024 at 12:18 PM Claude Warren  wrote:
>
> > Took me awhile to find it but the link to the KIP is
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1042%3A+Support+for+wildcard+when+creating+new+acls
> >
> > On Fri, May 3, 2024 at 10:13 AM Murali Basani 
> > wrote:
> >
> > > Hello,
> > >
> > > I'd like to propose a suggestion to our resource patterns in Kafka
> ACLs.
> > >
> > > Currently, when adding new ACLs in Kafka, we have two types of resource
> > > patterns for topics:
> > >
> > >- LITERAL
> > >- PREFIXED
> > >
> > > However, when it comes to listing or removing ACLs, we have a couple
> more
> > > options:
> > >
> > >- MATCH
> > >- ANY (will match any pattern type)
> > >
> > >
> > > If we can extend creating acls as well with 'MATCH' pattern type, it
> > would
> > > be very beneficial. Even though this kind of acl should be created with
> > > utmost care, it will help organizations streamline their ACL management
> > > processes.
> > >
> > > Example scenarios :
> > >
> > > Let's say we need to create ACLs for the following six topics:
> > > nl-accounts-localtopic, nl-accounts-remotetopic,
> de-accounts-localtopic,
> > > de-accounts-remotetopic, cz-accounts-localtopic,
> cz-accounts-remotetopic
> > >
> > > Currently, we achieve this using existing functionality by creating
> three
> > > prefixed ACLs as shown below:
> > >
> > > kafka-acls --bootstrap-server localhost:9092 \
> > > > --add \
> > > > --allow-principal
> > > >
> > >
> >
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> > > > \
> > > > --producer \
> > > > --topic nl-accounts- \
> > > > --resource-pattern-type prefixed
> > >
> > >
> > > kafka-acls --bootstrap-server localhost:9092 \
> > > > --add \
> > > > --allow-principal
> > > >
> > >
> >
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> > > > \
> > > > --producer \
> > > > --topic de-accounts- \
> > > > --resource-pattern-type prefixed
> > >
> > >
> > > kafka-acls --bootstrap-server localhost:9092 \
> > > > --add \
> > > > --allow-principal
> > > >
> > >
> >
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> > > > \
> > > > --producer \
> > > > --topic cz-accounts- \
> > > > --resource-pattern-type prefixed
> > >
> > >
> > > However, if we had the 'MATCH' pattern type available, we could
> > accomplish
> > > this with a single ACL, as illustrated here:
> > >
> > > kafka-acls --bootstrap-server localhost:9092 \
> > > > --add \
> > > > --allow-principal
> > > >
> > >
> >
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> > > > \
> > > > --producer \
> > > > --topic *-accounts-* \
> > > > --resource-pattern-type match
> > >
> > >
> > >
> > > This 

Re: [DISCUSS] KIP-1042 support for wildcard when creating new acls

2024-05-03 Thread Claude Warren, Jr
As I wrote in [1], the ACL evaluation algorithm needs to be specified with
respect to the specificity of the pattern so that we know exactly which if
*-accounts-* takes precedence over nl-accounts-* or visa versa.

I think that we should spell out that precedence is evaluated as follows:

1. Remove patterns that do not match
2. More specific patterns take precedence over less specific patterns
3. for patterns of the same precedence DENY overrides ALLOW

Determining specificity:

Specificity is based on the Levenshtein distance between the pattern and
the text being evaluated. The lower the distance the more specific the
rule.
Using the topic name: nl-accounts-localtopic we can evaluate the
Levenshtein distance for various patterns.
nl-accounts-localtopic = 0
*-accounts-localtopic = 2
nl-accounts-local* = 5
*-accounts-local* = 7
nl-accounts-* = 10
*-accounts-* = 12

In the special case of matching principles User matches are more specific
than Group matches.

I don't know if this should be added to KIP-1042 or presented as a new KIP.

Claude

[1] https://lists.apache.org/thread/0l88tkbxq3ol9rnx0ljnmswj5y6pho1f


On Fri, May 3, 2024 at 12:18 PM Claude Warren  wrote:

> Took me awhile to find it but the link to the KIP is
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1042%3A+Support+for+wildcard+when+creating+new+acls
>
> On Fri, May 3, 2024 at 10:13 AM Murali Basani 
> wrote:
>
> > Hello,
> >
> > I'd like to propose a suggestion to our resource patterns in Kafka ACLs.
> >
> > Currently, when adding new ACLs in Kafka, we have two types of resource
> > patterns for topics:
> >
> >- LITERAL
> >- PREFIXED
> >
> > However, when it comes to listing or removing ACLs, we have a couple more
> > options:
> >
> >- MATCH
> >- ANY (will match any pattern type)
> >
> >
> > If we can extend creating acls as well with 'MATCH' pattern type, it
> would
> > be very beneficial. Even though this kind of acl should be created with
> > utmost care, it will help organizations streamline their ACL management
> > processes.
> >
> > Example scenarios :
> >
> > Let's say we need to create ACLs for the following six topics:
> > nl-accounts-localtopic, nl-accounts-remotetopic, de-accounts-localtopic,
> > de-accounts-remotetopic, cz-accounts-localtopic, cz-accounts-remotetopic
> >
> > Currently, we achieve this using existing functionality by creating three
> > prefixed ACLs as shown below:
> >
> > kafka-acls --bootstrap-server localhost:9092 \
> > > --add \
> > > --allow-principal
> > >
> >
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> > > \
> > > --producer \
> > > --topic nl-accounts- \
> > > --resource-pattern-type prefixed
> >
> >
> > kafka-acls --bootstrap-server localhost:9092 \
> > > --add \
> > > --allow-principal
> > >
> >
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> > > \
> > > --producer \
> > > --topic de-accounts- \
> > > --resource-pattern-type prefixed
> >
> >
> > kafka-acls --bootstrap-server localhost:9092 \
> > > --add \
> > > --allow-principal
> > >
> >
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> > > \
> > > --producer \
> > > --topic cz-accounts- \
> > > --resource-pattern-type prefixed
> >
> >
> > However, if we had the 'MATCH' pattern type available, we could
> accomplish
> > this with a single ACL, as illustrated here:
> >
> > kafka-acls --bootstrap-server localhost:9092 \
> > > --add \
> > > --allow-principal
> > >
> >
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> > > \
> > > --producer \
> > > --topic *-accounts-* \
> > > --resource-pattern-type match
> >
> >
> >
> > This pattern closely resembles PREFIXED but offers broader allow/deny
> > rules.
> >
> > Implementing this change could significantly reduce the effort in several
> > acl management processes.
> >
> > I welcome your thoughts and any concerns you may have regarding this
> > proposal.
> >
> > Thanks,
> > Murali
> >
>
>
> --
> LinkedIn: http://www.linkedin.com/in/claudewarren
>


[jira] [Created] (KAFKA-16662) UnwritableMetadataException: Metadata has been lost

2024-05-03 Thread Tobias Bohn (Jira)
Tobias Bohn created KAFKA-16662:
---

 Summary: UnwritableMetadataException: Metadata has been lost
 Key: KAFKA-16662
 URL: https://issues.apache.org/jira/browse/KAFKA-16662
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.7.0
 Environment: Docker Image (bitnami/kafka:3.7.0)
via Docker Compose
Reporter: Tobias Bohn
 Attachments: log.txt

Hello,
First of all: I am new to this Jira and apologize if anything is set or 
specified incorrectly. Feel free to advise me.

We currently have an error in our test system, which unfortunately I can't 
solve, because I couldn't find anything related to it. No solution could be 
found via the mailing list either.
The error occurs when we want to start up a node. The node runs using Kraft and 
is both a controller and a broker. The following error message appears at 
startup:
{code:java}
kafka  | [2024-04-16 06:18:13,707] ERROR Encountered fatal fault: Unhandled 
error initializing new publishers 
(org.apache.kafka.server.fault.ProcessTerminatingFaultHandler)
kafka  | org.apache.kafka.image.writer.UnwritableMetadataException: Metadata 
has been lost because the following could not be represented in metadata 
version 3.5-IV2: the directory assignment state of one or more replicas
kafka  |        at 
org.apache.kafka.image.writer.ImageWriterOptions.handleLoss(ImageWriterOptions.java:94)
kafka  |        at 
org.apache.kafka.metadata.PartitionRegistration.toRecord(PartitionRegistration.java:391)
kafka  |        at org.apache.kafka.image.TopicImage.write(TopicImage.java:71)
kafka  |        at org.apache.kafka.image.TopicsImage.write(TopicsImage.java:84)
kafka  |        at 
org.apache.kafka.image.MetadataImage.write(MetadataImage.java:155)
kafka  |        at 
org.apache.kafka.image.loader.MetadataLoader.initializeNewPublishers(MetadataLoader.java:295)
kafka  |        at 
org.apache.kafka.image.loader.MetadataLoader.lambda$scheduleInitializeNewPublishers$0(MetadataLoader.java:266)
kafka  |        at 
org.apache.kafka.queue.KafkaEventQueue$EventContext.run(KafkaEventQueue.java:127)
kafka  |        at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.handleEvents(KafkaEventQueue.java:210)
kafka  |        at 
org.apache.kafka.queue.KafkaEventQueue$EventHandler.run(KafkaEventQueue.java:181)
kafka  |        at java.base/java.lang.Thread.run(Thread.java:840)
kafka exited with code 0 {code}
We use Docker to operate the cluster. The error occurred while we were trying 
to restart a node. All other nodes in the cluster are still running correctly.
If you need further information, please let us know. The complete log is 
attached to this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-1042 support for wildcard when creating new acls

2024-05-03 Thread Claude Warren
Took me awhile to find it but the link to the KIP is
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1042%3A+Support+for+wildcard+when+creating+new+acls

On Fri, May 3, 2024 at 10:13 AM Murali Basani 
wrote:

> Hello,
>
> I'd like to propose a suggestion to our resource patterns in Kafka ACLs.
>
> Currently, when adding new ACLs in Kafka, we have two types of resource
> patterns for topics:
>
>- LITERAL
>- PREFIXED
>
> However, when it comes to listing or removing ACLs, we have a couple more
> options:
>
>- MATCH
>- ANY (will match any pattern type)
>
>
> If we can extend creating acls as well with 'MATCH' pattern type, it would
> be very beneficial. Even though this kind of acl should be created with
> utmost care, it will help organizations streamline their ACL management
> processes.
>
> Example scenarios :
>
> Let's say we need to create ACLs for the following six topics:
> nl-accounts-localtopic, nl-accounts-remotetopic, de-accounts-localtopic,
> de-accounts-remotetopic, cz-accounts-localtopic, cz-accounts-remotetopic
>
> Currently, we achieve this using existing functionality by creating three
> prefixed ACLs as shown below:
>
> kafka-acls --bootstrap-server localhost:9092 \
> > --add \
> > --allow-principal
> >
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> > \
> > --producer \
> > --topic nl-accounts- \
> > --resource-pattern-type prefixed
>
>
> kafka-acls --bootstrap-server localhost:9092 \
> > --add \
> > --allow-principal
> >
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> > \
> > --producer \
> > --topic de-accounts- \
> > --resource-pattern-type prefixed
>
>
> kafka-acls --bootstrap-server localhost:9092 \
> > --add \
> > --allow-principal
> >
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> > \
> > --producer \
> > --topic cz-accounts- \
> > --resource-pattern-type prefixed
>
>
> However, if we had the 'MATCH' pattern type available, we could accomplish
> this with a single ACL, as illustrated here:
>
> kafka-acls --bootstrap-server localhost:9092 \
> > --add \
> > --allow-principal
> >
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> > \
> > --producer \
> > --topic *-accounts-* \
> > --resource-pattern-type match
>
>
>
> This pattern closely resembles PREFIXED but offers broader allow/deny
> rules.
>
> Implementing this change could significantly reduce the effort in several
> acl management processes.
>
> I welcome your thoughts and any concerns you may have regarding this
> proposal.
>
> Thanks,
> Murali
>


-- 
LinkedIn: http://www.linkedin.com/in/claudewarren


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-03 Thread Loic Greffier
Hi Bruno, 

Good catch, KIP has been updated accordingly.

Regards,
Loïc


Re: [VOTE] KIP-1033: Add Kafka Streams exception handler for exceptions occurring during processing

2024-05-03 Thread Bruno Cadonna

Hi Damien, Sébastien, and Loïc,

Thanks for the KIP!

+1 (binding)

Best,
Bruno


On 4/26/24 4:00 PM, Damien Gasparina wrote:

Hi all,

We would like to start a vote for KIP-1033: Add Kafka Streams
exception handler for exceptions occurring during processing

The KIP is available on
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1033%3A+Add+Kafka+Streams+exception+handler+for+exceptions+occurring+during+processing

If you have any suggestions or feedback, feel free to participate to
the discussion thread:
https://lists.apache.org/thread/1nhhsrogmmv15o7mk9nj4kvkb5k2bx9s

Best regards,
Damien Sebastien and Loic


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-03 Thread Bruno Cadonna

Hi,

the KIP looks great!

public static final String PROCESS_EXCEPTION_HANDLER_CLASS_CONFIG = 
"process.exception.handler".


needs to be changed to

public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = 
"processing.exception.handler".


The name of the constant has been already corrected in the code block 
but the actual name of the config (i.e., the content of the constant) 
has not been changed yet.



Best,
Bruno


On 5/3/24 10:35 AM, Sebastien Viale wrote:

Hi,
So, we all agree to revert to the regular Headers interface in 
ErrorHandlerContext.
We will update the KIP accordingly.
@Sophie => Yes, this is the last remaining question, and it has been open for 
voting since last week.
Thanks

Sébastien

De : Andrew Schofield 
Envoyé : vendredi 3 mai 2024 06:44
À : dev@kafka.apache.org 
Objet : [SUSPICIOUS EXTERNAL MESSAGE] Re: [DISCUSS] KIP-1033: Add Kafka Streams 
exception handler for exceptions occuring during processing

Warning This might be a fraudulent message! When clicking REPLY, your answers 
will NOT go to the sender (andrew_schofi...@live.com). Instead, replies will be 
sent to dev@kafka.apache.org. Be cautious!

Hi,
I’ve changed my mind on this one having read through the comments.

I don’t think the exception handler should be able to mess with the headers
to the detriment of the code that called the handler.

While I like the hygiene of having an ImmutableHeaders interface,
I feel we can use the existing interface to get the effect we desire.

Thanks,
Andrew


This email was screened for spam and malicious content but exercise caution 
anyway.



On 3 May 2024, at 03:40, Sophie Blee-Goldman  wrote:

I tend to agree that we should just return a pure Headers instead of
introducing a new class/interface to protect overwriting them. I think a
pretty good case has been made already so I won't add onto it, just wanted
to voice my support.

Is that the only remaining question on this KIP? Might be ok to move to a
vote now?

On Wed, May 1, 2024 at 8:05 AM Lianet M.  wrote:


Hi all, thanks Damien for the KIP!

After looking into the KIP and comments, my only concern is aligned with
one of Matthias comments, around the ImmutableHeaders introduction, with
the motivation not being clear enough. The existing handlers already expose
the headers (indirectly). Ex.
ProductionExceptionHandler.handleSerializationException provides the
ProducerRecord as an argument, so they are already exposed in those
callbacks through record.headers(). Is there a reason to think that it
would be a problem to expose the headers in the
new ProcessingExceptionHandler, but that it's not a problem for the
existing handler?

If there is no real concern about the KS engine requiring those headers, it
feels hard to mentally justify the complexity we transfer to the user by
exposing a new concept into the callbacks to represent the headers. In the
end, it strays aways from the simple/consistent representation of Headers
used all over. Even if eventually the KS engine needs to use the headers
after the callbacks with certainty that they were not altered, still feels
like it's something we could attempt to solve internally, without having to
transfer "new concepts" into the user (ex. the deep-copy as it was
suggested, seems like the kind of trade-off that would maybe be acceptable
here to gain simplicity and consistency among the handlers with a single
existing representation of Headers).

Best!

Lianet



On Tue, Apr 30, 2024 at 9:36 PM Matthias J. Sax  wrote:


Thanks for the update.

I am wondering if we should use `ReadOnlyHeaders` instead of
`ImmutableHeaders` as interface name?

Also, the returned `Header` interface is technically not immutable
either, because `Header#key()` returns a mutable byte-array... Would we
need a `ReadOnlyHeader` interface?

If yes, it seems that `ReadOnlyHeaders` should not be a super-interface
of `Headers` but it would rather be a standalone interface, and a
wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some
immutable type instead of `byte[]` for the value()?

An alternative would be to deep-copy the value byte-array what would not
be free, but given that we are talking about exception handling, it
would not be on the hot code path, and thus might be acceptable?


The above seems to increase the complexity significantly though. Hence,
I have seconds thoughts on the immutability question:

Do we really need to worry about mutability after all, because in the
end, KS runtime won't read the Headers instance after the handler was
called, and if a user modifies the passed in headers, there won't be any
actual damage (ie, no side effects)? For this case, it might even be ok
to also not add `ImmutableHeaders` to begin with?



Sorry for the forth and back (yes, forth and back, because back and
forth does not make sense -- it's not logical -- just 

[DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-03 Thread Sebastien Viale
Just as a reminder, here the link for the vote:

https://lists.apache.org/thread/o1kvv8zjzsp72ohcjpckdy544ko9tjjb


regards
Sébastien



De : Sebastien Viale 
Envoyé : vendredi 3 mai 2024 10:35
À : dev@kafka.apache.org 
Objet : [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions 
occuring during processing

Hi,
So, we all agree to revert to the regular Headers interface in 
ErrorHandlerContext.
We will update the KIP accordingly.
@Sophie => Yes, this is the last remaining question, and it has been open for 
voting since last week.
Thanks

Sébastien

De : Andrew Schofield 
Envoyé : vendredi 3 mai 2024 06:44
À : dev@kafka.apache.org 
Objet : [SUSPICIOUS EXTERNAL MESSAGE] Re: [DISCUSS] KIP-1033: Add Kafka Streams 
exception handler for exceptions occuring during processing

Warning This might be a fraudulent message! When clicking REPLY, your answers 
will NOT go to the sender (andrew_schofi...@live.com). Instead, replies will be 
sent to dev@kafka.apache.org. Be cautious!

Hi,
I’ve changed my mind on this one having read through the comments.

I don’t think the exception handler should be able to mess with the headers
to the detriment of the code that called the handler.

While I like the hygiene of having an ImmutableHeaders interface,
I feel we can use the existing interface to get the effect we desire.

Thanks,
Andrew


This email was screened for spam and malicious content but exercise caution 
anyway.


> On 3 May 2024, at 03:40, Sophie Blee-Goldman  wrote:
>
> I tend to agree that we should just return a pure Headers instead of
> introducing a new class/interface to protect overwriting them. I think a
> pretty good case has been made already so I won't add onto it, just wanted
> to voice my support.
>
> Is that the only remaining question on this KIP? Might be ok to move to a
> vote now?
>
> On Wed, May 1, 2024 at 8:05 AM Lianet M.  wrote:
>
>> Hi all, thanks Damien for the KIP!
>>
>> After looking into the KIP and comments, my only concern is aligned with
>> one of Matthias comments, around the ImmutableHeaders introduction, with
>> the motivation not being clear enough. The existing handlers already expose
>> the headers (indirectly). Ex.
>> ProductionExceptionHandler.handleSerializationException provides the
>> ProducerRecord as an argument, so they are already exposed in those
>> callbacks through record.headers(). Is there a reason to think that it
>> would be a problem to expose the headers in the
>> new ProcessingExceptionHandler, but that it's not a problem for the
>> existing handler?
>>
>> If there is no real concern about the KS engine requiring those headers, it
>> feels hard to mentally justify the complexity we transfer to the user by
>> exposing a new concept into the callbacks to represent the headers. In the
>> end, it strays aways from the simple/consistent representation of Headers
>> used all over. Even if eventually the KS engine needs to use the headers
>> after the callbacks with certainty that they were not altered, still feels
>> like it's something we could attempt to solve internally, without having to
>> transfer "new concepts" into the user (ex. the deep-copy as it was
>> suggested, seems like the kind of trade-off that would maybe be acceptable
>> here to gain simplicity and consistency among the handlers with a single
>> existing representation of Headers).
>>
>> Best!
>>
>> Lianet
>>
>>
>>
>> On Tue, Apr 30, 2024 at 9:36 PM Matthias J. Sax  wrote:
>>
>>> Thanks for the update.
>>>
>>> I am wondering if we should use `ReadOnlyHeaders` instead of
>>> `ImmutableHeaders` as interface name?
>>>
>>> Also, the returned `Header` interface is technically not immutable
>>> either, because `Header#key()` returns a mutable byte-array... Would we
>>> need a `ReadOnlyHeader` interface?
>>>
>>> If yes, it seems that `ReadOnlyHeaders` should not be a super-interface
>>> of `Headers` but it would rather be a standalone interface, and a
>>> wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some
>>> immutable type instead of `byte[]` for the value()?
>>>
>>> An alternative would be to deep-copy the value byte-array what would not
>>> be free, but given that we are talking about exception handling, it
>>> would not be on the hot code path, and thus might be acceptable?
>>>
>>>
>>> The above seems to increase the complexity significantly though. Hence,
>>> I have seconds thoughts on the immutability question:
>>>
>>> Do we really need to worry about mutability after all, because in the
>>> end, KS runtime won't read the Headers instance after the handler was
>>> called, and if a user modifies the passed in headers, there won't be any
>>> actual damage (ie, no side effects)? For this case, it might even be ok
>>> to also not add `ImmutableHeaders` to begin with?
>>>
>>>
>>>
>>> Sorry for the forth and back 

[DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-03 Thread Sebastien Viale
Hi,
So, we all agree to revert to the regular Headers interface in 
ErrorHandlerContext.
We will update the KIP accordingly.
@Sophie => Yes, this is the last remaining question, and it has been open for 
voting since last week.
Thanks

Sébastien

De : Andrew Schofield 
Envoyé : vendredi 3 mai 2024 06:44
À : dev@kafka.apache.org 
Objet : [SUSPICIOUS EXTERNAL MESSAGE] Re: [DISCUSS] KIP-1033: Add Kafka Streams 
exception handler for exceptions occuring during processing

Warning This might be a fraudulent message! When clicking REPLY, your answers 
will NOT go to the sender (andrew_schofi...@live.com). Instead, replies will be 
sent to dev@kafka.apache.org. Be cautious!

Hi,
I’ve changed my mind on this one having read through the comments.

I don’t think the exception handler should be able to mess with the headers
to the detriment of the code that called the handler.

While I like the hygiene of having an ImmutableHeaders interface,
I feel we can use the existing interface to get the effect we desire.

Thanks,
Andrew


This email was screened for spam and malicious content but exercise caution 
anyway.


> On 3 May 2024, at 03:40, Sophie Blee-Goldman  wrote:
>
> I tend to agree that we should just return a pure Headers instead of
> introducing a new class/interface to protect overwriting them. I think a
> pretty good case has been made already so I won't add onto it, just wanted
> to voice my support.
>
> Is that the only remaining question on this KIP? Might be ok to move to a
> vote now?
>
> On Wed, May 1, 2024 at 8:05 AM Lianet M.  wrote:
>
>> Hi all, thanks Damien for the KIP!
>>
>> After looking into the KIP and comments, my only concern is aligned with
>> one of Matthias comments, around the ImmutableHeaders introduction, with
>> the motivation not being clear enough. The existing handlers already expose
>> the headers (indirectly). Ex.
>> ProductionExceptionHandler.handleSerializationException provides the
>> ProducerRecord as an argument, so they are already exposed in those
>> callbacks through record.headers(). Is there a reason to think that it
>> would be a problem to expose the headers in the
>> new ProcessingExceptionHandler, but that it's not a problem for the
>> existing handler?
>>
>> If there is no real concern about the KS engine requiring those headers, it
>> feels hard to mentally justify the complexity we transfer to the user by
>> exposing a new concept into the callbacks to represent the headers. In the
>> end, it strays aways from the simple/consistent representation of Headers
>> used all over. Even if eventually the KS engine needs to use the headers
>> after the callbacks with certainty that they were not altered, still feels
>> like it's something we could attempt to solve internally, without having to
>> transfer "new concepts" into the user (ex. the deep-copy as it was
>> suggested, seems like the kind of trade-off that would maybe be acceptable
>> here to gain simplicity and consistency among the handlers with a single
>> existing representation of Headers).
>>
>> Best!
>>
>> Lianet
>>
>>
>>
>> On Tue, Apr 30, 2024 at 9:36 PM Matthias J. Sax  wrote:
>>
>>> Thanks for the update.
>>>
>>> I am wondering if we should use `ReadOnlyHeaders` instead of
>>> `ImmutableHeaders` as interface name?
>>>
>>> Also, the returned `Header` interface is technically not immutable
>>> either, because `Header#key()` returns a mutable byte-array... Would we
>>> need a `ReadOnlyHeader` interface?
>>>
>>> If yes, it seems that `ReadOnlyHeaders` should not be a super-interface
>>> of `Headers` but it would rather be a standalone interface, and a
>>> wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some
>>> immutable type instead of `byte[]` for the value()?
>>>
>>> An alternative would be to deep-copy the value byte-array what would not
>>> be free, but given that we are talking about exception handling, it
>>> would not be on the hot code path, and thus might be acceptable?
>>>
>>>
>>> The above seems to increase the complexity significantly though. Hence,
>>> I have seconds thoughts on the immutability question:
>>>
>>> Do we really need to worry about mutability after all, because in the
>>> end, KS runtime won't read the Headers instance after the handler was
>>> called, and if a user modifies the passed in headers, there won't be any
>>> actual damage (ie, no side effects)? For this case, it might even be ok
>>> to also not add `ImmutableHeaders` to begin with?
>>>
>>>
>>>
>>> Sorry for the forth and back (yes, forth and back, because back and
>>> forth does not make sense -- it's not logical -- just trying to fix
>>> English :D) as I did bring up the immutability question in the first
>>> place...
>>>
>>>
>>>
>>> -Matthias
>>>
>>> On 4/25/24 5:56 AM, Loic Greffier wrote:
 Hi Matthias,

 I have updated the KIP regarding points 103 and 108.


Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets

2024-05-03 Thread Bruno Cadonna

Hi Matthias,


101:
Let's assume a RocksDB store, but I think the following might be true 
also for other store implementations. With this KIP, if Kafka Streams 
commits the offsets, the committed offsets will be stored in an 
in-memory data structure (i.e. the memtable) and stay there until 
RocksDB decides that it is time to persist its in-memory data structure. 
If Kafka Streams writes its position to the .position file during a 
commit and a crash happens before RocksDB persist the memtable then the 
position in the .position file is ahead of the persisted offset. If IQ 
is done between the crash and the state store fully restored the 
changelog, the position might tell IQ that the state store is more 
up-to-date than it actually is.
In contrast, if Kafka Streams handles persisting positions the same as 
persisting offset, the position should always be consistent with the 
offset, because they are persisted together.



102:
I am confused about your confusion which tells me that we are talking 
about two different things.

You asked

"Do you intent to add this information [i.e. position] to the map passed 
via commit(final Map changelogOffsets)?"


and with what I wrote I meant that we do not need to pass the position 
into the implementation of the StateStore interface since the position 
is updated within the implementation of the StateStore interface (e.g. 
RocksDBStore [1]). My statement describes the behavior now, not the 
change proposed in this KIP, so it does not contradict what is stated in 
the KIP.



200:
This is about Matthias' main concern about rebalance metadata.
As far as I understand the KIP, Kafka Streams will only use the 
.checkpoint files to compute the task lag for unassigned tasks whose 
state is locally available. For assigned tasks, it will use the offsets 
managed by the open state store.


Best,
Bruno

[1] 
https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397


On 5/1/24 3:00 AM, Matthias J. Sax wrote:

Thanks Bruno.



101: I think I understand this better now. But just want to make sure I 
do. What do you mean by "they can diverge" and "Recovering after a 
failure might load inconsistent offsets and positions."


The checkpoint is the offset from the changelog, while the position is 
the offset from the upstream source topic, right? -- In the end, the 
position is about IQ, and if we fail to update it, it only means that 
there is some gap when we might not be able to query a standby task, 
because we think it's not up-to-date enough even if it is, which would 
resolve itself soon? Ie, the position might "lag", but it's not 
"inconsistent". Do we believe that this lag would be highly problematic?




102: I am confused.

The position is maintained inside the state store, but is persisted in 
the .position file when the state store closes. 


This contradicts the KIP:

 these position offsets will be stored in RocksDB, in the same column 
family as the changelog offsets, instead of the .position file




My main concern is currently about rebalance metadata -- opening RocksDB 
stores seems to be very expensive, but if we follow the KIP:


We will do this under EOS by updating the .checkpoint file whenever a 
store is close()d. 


It seems, having the offset inside RocksDB does not help us at all? In 
the end, when we crash, we don't want to lose the state, but when we 
update the .checkpoint only on a clean close, the .checkpoint might be 
stale (ie, still contains the checkpoint when we opened the store when 
we got a task assigned).




-Matthias

On 4/30/24 2:40 AM, Bruno Cadonna wrote:

Hi all,

100
I think we already have such a wrapper. It is called 
AbstractReadWriteDecorator.



101
Currently, the position is checkpointed when a offset checkpoint is 
written. If we let the state store manage the committed offsets, we 
need to also let the state store also manage the position otherwise 
they might diverge. State store managed offsets can get flushed (i.e. 
checkpointed) to the disk when the state store decides to flush its 
in-memory data structures, but the position is only checkpointed at 
commit time. Recovering after a failure might load inconsistent 
offsets and positions.



102
The position is maintained inside the state store, but is persisted in 
the .position file when the state store closes. The only public 
interface that uses the position is IQv2 in a read-only mode. So the 
position is only updated within the state store and read from IQv2. No 
need to add anything to the public StateStore interface.



103
Deprecating managesOffsets() right away might be a good idea.


104
I agree that we should try to support downgrades without wipes. At 
least Nick should state in the KIP why we do not support it.



Best,
Bruno




On 4/23/24 8:13 AM, Matthias J. Sax wrote:
Thanks for splitting out this KIP. The discussion shows, that it is a 
complex 

Re: [DISCUSS] KIP-1018: Introduce max remote fetch timeout config

2024-05-03 Thread Luke Chen
Hi Kamal,

Thanks for the KIP!
Sorry for the late review.

Overall LGTM! Just 1 question:

If one fetch request contains 2 partitions: [p1, p2]
fetch.max.wait.ms: 500, remote.fetch.max.wait.ms: 1000

And now, p1 fetch offset is the log end offset and has no new data coming,
and p2 fetch offset is to fetch from remote storage.
And suppose the fetch from remote storage takes 1000ms.
So, question:
Will this fetch request return in 500ms or 1000ms?
And what will be returned?

I think before this change, it'll return within 500ms, right?
But it's not clear what behavior it will be after this KIP.

Thank you.
Luke


On Fri, May 3, 2024 at 1:56 PM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:

> Christo,
>
> We have localTimeMs, remoteTimeMs, and totalTimeMs as part of the
> FetchConsumer request metric.
>
>
> kafka.network:type=RequestMetrics,name={LocalTimeMs|RemoteTimeMs|TotalTimeMs},request={Produce|FetchConsumer|FetchFollower}
>
> RemoteTimeMs refers to the amount of time spent in the purgatory for normal
> fetch requests
> and amount of time spent in reading the remote data for remote-fetch
> requests. Do we want
> to have a separate `TieredStorageTimeMs` to capture the time spent in
> remote-read requests?
>
> With per-broker level timer metrics combined with the request level
> metrics, the user will have
> sufficient information.
>
> Metric name =
>
> kafka.log.remote:type=RemoteLogManager,name=RemoteLogReaderFetchRateAndTimeMs
>
> --
> Kamal
>
> On Mon, Apr 29, 2024 at 1:38 PM Christo Lolov 
> wrote:
>
> > Heya!
> >
> > Is it difficult to instead add the metric at
> > kafka.network:type=RequestMetrics,name=TieredStorageMs (or some other
> > name=*)? Alternatively, if it is difficult to add it there, is it
> possible
> > to add 2 metrics, one at the RequestMetrics level (even if it is
> > total-time-ms - (all other times)) and one at what you are proposing? As
> an
> > operator I would find it strange to not see the metric in the
> > RequestMetrics.
> >
> > Your thoughts?
> >
> > Best,
> > Christo
> >
> > On Sun, 28 Apr 2024 at 10:52, Kamal Chandraprakash <
> > kamal.chandraprak...@gmail.com> wrote:
> >
> > > Christo,
> > >
> > > Updated the KIP with the remote fetch latency metric. Please take
> another
> > > look!
> > >
> > > --
> > > Kamal
> > >
> > > On Sun, Apr 28, 2024 at 12:23 PM Kamal Chandraprakash <
> > > kamal.chandraprak...@gmail.com> wrote:
> > >
> > > > Hi Federico,
> > > >
> > > > Thanks for the suggestion! Updated the config name to "
> > > > remote.fetch.max.wait.ms".
> > > >
> > > > Christo,
> > > >
> > > > Good point. We don't have the remote-read latency metrics to measure
> > the
> > > > performance of the remote read requests. I'll update the KIP to emit
> > this
> > > > metric.
> > > >
> > > > --
> > > > Kamal
> > > >
> > > >
> > > > On Sat, Apr 27, 2024 at 4:03 PM Federico Valeri <
> fedeval...@gmail.com>
> > > > wrote:
> > > >
> > > >> Hi Kamal, it looks like all TS configurations starts with "remote."
> > > >> prefix, so I was wondering if we should name it
> > > >> "remote.fetch.max.wait.ms".
> > > >>
> > > >> On Fri, Apr 26, 2024 at 7:07 PM Kamal Chandraprakash
> > > >>  wrote:
> > > >> >
> > > >> > Hi all,
> > > >> >
> > > >> > If there are no more comments, I'll start a vote thread by
> tomorrow.
> > > >> > Please review the KIP.
> > > >> >
> > > >> > Thanks,
> > > >> > Kamal
> > > >> >
> > > >> > On Sat, Mar 30, 2024 at 11:08 PM Kamal Chandraprakash <
> > > >> > kamal.chandraprak...@gmail.com> wrote:
> > > >> >
> > > >> > > Hi all,
> > > >> > >
> > > >> > > Bumping the thread. Please review this KIP. Thanks!
> > > >> > >
> > > >> > > On Thu, Feb 1, 2024 at 9:11 PM Kamal Chandraprakash <
> > > >> > > kamal.chandraprak...@gmail.com> wrote:
> > > >> > >
> > > >> > >> Hi Jorge,
> > > >> > >>
> > > >> > >> Thanks for the review! Added your suggestions to the KIP. PTAL.
> > > >> > >>
> > > >> > >> The `fetch.max.wait.ms` config will be also applicable for
> > topics
> > > >> > >> enabled with remote storage.
> > > >> > >> Updated the description to:
> > > >> > >>
> > > >> > >> ```
> > > >> > >> The maximum amount of time the server will block before
> answering
> > > the
> > > >> > >> fetch request
> > > >> > >> when it is reading near to the tail of the partition
> > > >> (high-watermark) and
> > > >> > >> there isn't
> > > >> > >> sufficient data to immediately satisfy the requirement given by
> > > >> > >> fetch.min.bytes.
> > > >> > >> ```
> > > >> > >>
> > > >> > >> --
> > > >> > >> Kamal
> > > >> > >>
> > > >> > >> On Thu, Feb 1, 2024 at 12:12 AM Jorge Esteban Quilcate Otoya <
> > > >> > >> quilcate.jo...@gmail.com> wrote:
> > > >> > >>
> > > >> > >>> Hi Kamal,
> > > >> > >>>
> > > >> > >>> Thanks for this KIP! It should help to solve one of the main
> > > issues
> > > >> with
> > > >> > >>> tiered storage at the moment that is dealing with individual
> > > >> consumer
> > > >> > >>> configurations to avoid flooding logs with interrupted
> > exceptions.
> > 

[DISCUSS] KIP-1042 support for wildcard when creating new acls

2024-05-03 Thread Murali Basani
Hello,

I'd like to propose a suggestion to our resource patterns in Kafka ACLs.

Currently, when adding new ACLs in Kafka, we have two types of resource
patterns for topics:

   - LITERAL
   - PREFIXED

However, when it comes to listing or removing ACLs, we have a couple more
options:

   - MATCH
   - ANY (will match any pattern type)


If we can extend creating acls as well with 'MATCH' pattern type, it would
be very beneficial. Even though this kind of acl should be created with
utmost care, it will help organizations streamline their ACL management
processes.

Example scenarios :

Let's say we need to create ACLs for the following six topics:
nl-accounts-localtopic, nl-accounts-remotetopic, de-accounts-localtopic,
de-accounts-remotetopic, cz-accounts-localtopic, cz-accounts-remotetopic

Currently, we achieve this using existing functionality by creating three
prefixed ACLs as shown below:

kafka-acls --bootstrap-server localhost:9092 \
> --add \
> --allow-principal
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> \
> --producer \
> --topic nl-accounts- \
> --resource-pattern-type prefixed


kafka-acls --bootstrap-server localhost:9092 \
> --add \
> --allow-principal
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> \
> --producer \
> --topic de-accounts- \
> --resource-pattern-type prefixed


kafka-acls --bootstrap-server localhost:9092 \
> --add \
> --allow-principal
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> \
> --producer \
> --topic cz-accounts- \
> --resource-pattern-type prefixed


However, if we had the 'MATCH' pattern type available, we could accomplish
this with a single ACL, as illustrated here:

kafka-acls --bootstrap-server localhost:9092 \
> --add \
> --allow-principal
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> \
> --producer \
> --topic *-accounts-* \
> --resource-pattern-type match



This pattern closely resembles PREFIXED but offers broader allow/deny rules.

Implementing this change could significantly reduce the effort in several
acl management processes.

I welcome your thoughts and any concerns you may have regarding this
proposal.

Thanks,
Murali


[jira] [Created] (KAFKA-16661) add a lower `log.initial.task.delay.ms` value to integration test framework

2024-05-03 Thread Luke Chen (Jira)
Luke Chen created KAFKA-16661:
-

 Summary: add a lower `log.initial.task.delay.ms` value to 
integration test framework
 Key: KAFKA-16661
 URL: https://issues.apache.org/jira/browse/KAFKA-16661
 Project: Kafka
  Issue Type: Test
Reporter: Luke Chen


After KAFKA-16552, we created an internal config `log.initial.task.delay.ms` to 
control the initial task delay in log manager. This ticket follows it up, to 
set a default low value (100ms, 500ms maybe?) for it, to speed up the tests.

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16660) reduce the check interval to speedup DelegationTokenRequestsTest

2024-05-03 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16660:
--

 Summary: reduce the check interval to speedup 
DelegationTokenRequestsTest
 Key: KAFKA-16660
 URL: https://issues.apache.org/jira/browse/KAFKA-16660
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai


the check interval is 1 minute 
(https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala#L49),
 and `DelegationTokenRequestsTest` wait 2 minutes before running the check 
(https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala#L159)
 ...

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-03 Thread Bruno Cadonna

Hi Sophie,

117f:
I think, removing the STATEFUL and STATELESS types is not enough to 
avoid the error Guozhang mentioned. The StreamsPartitionAssignor passes 
the information whether a task is stateless or stateful into the task 
assignor. However, the task assignor can return a standby task for a 
stateless task which is inconsistent.


Echoing Matthias' statement about the missing UNKNOWN_TASK_ID error.

nit:
The titles of some code blocks in the KIP are not consistent with their 
content, e.g., KafkaStreamsState <-> NodeState



Best,
Bruno

On 5/3/24 2:43 AM, Matthias J. Sax wrote:
Thanks Sophie. My bad. You are of course right about `TaskAssignment` 
and the StreamsPartitionAssignor's responsibitliy to map tasks of a 
instance to consumers. When I wrote my reply, I forgot about this detail.


Seems you did not add `UNKNOWN_TASK_ID` error yet as proposed by Guozhang?

Otherwise LGTM.


-Matthias

On 5/2/24 4:20 PM, Sophie Blee-Goldman wrote:

Guozhang:

117. All three additions make sense to me. However, while thinking about
how users would actually produce an assignment, I realized that it seems
silly to make it their responsibility to distinguish between a stateless
and stateful task when they return the assignment. The
StreamsPartitionAssignor already knows which tasks are stateful vs
stateless, so there's no need to add this extra step for users to 
figure it

out themselves, and potentially make a mistake.

117f: So, rather than add a new error type for "inconsistent task types",
I'm proposing to just flatten the AssignedTask.Type enum to only "ACTIVE"
and "STANDBY", and remove the "STATEFUL" and "STATELESS" types 
altogether.

Any objections?

-

-Thanks, fixed the indentation of headers under "User APIs" and 
"Read-Only

APIs"

-As for the return type of the TaskAssignmentUtils methods, I don't
personally feel too strongly about this, but the reason for the return 
type
being a Map rather than a 
TaskAssignment

is because they are meant to be used iteratively/to create a part of the
full assignment, and not necessarily a full assignment for each. Notice
that they all have an input parameter of the same type: Map. The idea is you can take the output of any of
these and pass it in to another to generate or optimize another piece of
the overall assignment. For example, if you want to perform the 
rack-aware

optimization on both active and standby tasks, you would need to call
#optimizeRackAwareActiveTasks and then forward the output to
#optimizeRackAwareStandbyTasks to get the final assignment. If we 
return a

TaskAssignment, it will usually need to be unwrapped right away. Perhaps
more importantly, I worry that returning a TaskAssignment will make it 
seem

like each of these utility methods return a "full" and final assignment
that can just be returned as-is from the TaskAssignor's #assign method.
Whereas they are each just a single step in the full assignment process,
and not the final product. Does that make sense?

On Thu, May 2, 2024 at 3:50 PM Sophie Blee-Goldman 


wrote:


Matthias:

Thanks for the naming suggestions for the error codes. I was
definitely not happy with my original naming but couldn't think of 
anything
better.  I like your proposals though, will update the KIP names. 
I'll also
add a "NONE" option as well -- much better than just passing in null 
for no

error.


OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the
same active task


  Would also be an error if assigned to two consumers of the same 
client...

Needs to be rephrased.



Well the TaskAssignor only assigns tasks to KafkaStreams clients, 
it's not
responsible for the assignment of tasks to consumers within a 
KafkaStreams.

It would be a bug in the StreamsPartitionAssignor if it received a valid
assignment from the TaskAssignor with only one copy of a task 
assigned to a
single KAfkaStreams client, and then somehow ended up assigning that 
task

to multiple consumers on the KafkaStreams client. It wouldn't be the
TaskAssignor's fault so imo it would not make sense to include this 
case in

the OVERLAPPING_CLIENT error (or as it's now called, ACTIVE_TASK_
ASSIGNED_MULTIPLE_TIMES).  Not to mention, if there was a bug that 
caused

the StreamsPartitionAssignor to assign a task to multiple consumers, it
presumably wouldn't even notice since it's a bug -- if it did notice, it
can just fix the issue. The error codes are about communicating 
unfixable
issues due to the TaskAssignor itself returning an invalid 
assignment. The

phrasing is intentional, and (imo) correct as it is.

I do see your point about how the StreamsPartitionAssignor should
handle/react to invalid assignments though. I'm fine with just 
throwing a

StreamsException and crashing after we invoke the #onAssignmentComputed
callback to notify the user of the error.

On Wed, May 1, 2024 at 9:46 AM Guozhang Wang 


wrote:


Jumping back to the party here :)

107: I agree with the rationale behind this, and

Re: Suggestion about support for wildcard when creating new acls

2024-05-03 Thread Claude Warren, Jr
I think that if this is introduced (and perhaps even if it is not) we need
a clear ACL evaluation process.

I know we have both allow and deny, and that deny takes precedence over
allow.

But let's consider two scenarios

1. Unintended access.

Let's assume we start with the 6 topics Murali used in his
example: nl-accounts-localtopic, nl-accounts-remotetopic,
de-accounts-localtopic, de-accounts-remotetopic, cz-accounts-localtopic,
and cz-accounts-remotetopic
Assume that *-accounts-* pattern granted read access to anyone.
I create an account us-accounts-privatetopic and grant explicit access some
individuals.
Because of the *-accounts-* everyone has access to my privatetopic, and I
may not know that there is a leak until it is far too late.
I don't have a good way to determine which ACLs will impact my topic.
I cannot add a general us-accounts-privatetopic DENY and hope my explicit
access works because DENY takes precedence over ALLOW.

2. Unintended/Hostile denial
Let's assume we start with the 6 topics Murali used in his
example: nl-accounts-localtopic, nl-accounts-remotetopic,
de-accounts-localtopic, de-accounts-remotetopic, cz-accounts-localtopic,
and cz-accounts-remotetopic
Assume that *-accounts-* pattern grants read access to anyone.
A bad or carless actor could create *-* DENY which would cause the system
to cease functioning as expected.
There is not a good way to determine which ACLs impacted the topic.


*Note* that both of these issues can occur with the PREFIXED pattern as
well, so this is not an argument against the MATCH pattern.  There is a
fundamental issue with the current ACL implementation as relates to
wildcards.

I think that the evaluation process should be:

   1. Remove patterns that do not match
   2. More specific patterns take precedence over less specific patterns
   3. for patterns of the same precedence DENY overrides ALLOW

*Determining specificity*:

Specificity is generally based on the Levenshtein distance between the
pattern and the text being evaluated.  The lower the distance the more
specific the rule.
Using the topic name: nl-accounts-localtopic we can evaluate the
Levenshtein distance for various patterns.

nl-accounts-localtopic = 0
*-accounts-localtopic = 2
nl-accounts-local* = 5
*-accounts-local* = 7
nl-accounts-* = 10
*-accounts-* = 12

In the special case of matching principles User matches are more specific
than Group matches.

*Usability*

With the ACL system becoming a complex web of patterns, it is
incumbent upon the development team to provide tools to assist in
permissions problem determination.

1.  There should be a tool that will provide a list of all ACLs that impact
the decision to allow or deny access for a principal to a topic based on
principal ID, host, and operation.  This will assist operators in rapidly
determining the reason for access denied errors.
2. There should be a tool to show the effects of adding an ACL.  Using the
example from above adding *-accounts-*", should list that
nl-accounts-localtopic, nl-accounts-remotetopic,
de-accounts-localtopic, de-accounts-remotetopic, cz-accounts-localtopic,
and cz-accounts-remotetopic are affected.
3. There should be a tool to show the effects of adding a topic.  Using the
example from above adding *us-accounts-privatetopic", should list that
"*-accounts-*" will influence the permissions calculations for the new
topic.

*Summary*

I leave determining whether or not adding MATCH as a pattern type is a good
idea to others with more experience in Kafka.  But in either case, I
believe that we need to look at how we evaluate ACLs given that we already
have a wild card ACL pattern.

Claude

On Thu, May 2, 2024 at 3:56 PM Murali Basani 
wrote:

> Hello,
>
> I'd like to propose a suggestion to our resource patterns in Kafka ACLs.
>
> Currently, when adding new ACLs in Kafka, we have two types of resource
> patterns for topics:
>
>- LITERAL
>- PREFIXED
>
> However, when it comes to listing or removing ACLs, we have a couple more
> options:
>
>- MATCH
>- ANY (will match any pattern type)
>
>
> If we can extend creating acls as well with 'MATCH' pattern type, it would
> be very beneficial. Even though this kind of acl should be created with
> utmost care, it will help organizations streamline their ACL management
> processes.
>
> Example scenarios :
>
> Let's say we need to create ACLs for the following six topics:
> nl-accounts-localtopic, nl-accounts-remotetopic, de-accounts-localtopic,
> de-accounts-remotetopic, cz-accounts-localtopic, cz-accounts-remotetopic
>
> Currently, we achieve this using existing functionality by creating three
> prefixed ACLs as shown below:
>
> kafka-acls --bootstrap-server localhost:9092 \
> > --add \
> > --allow-principal
> >
> User:CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown
> > \
> > --producer \
> > --topic nl-accounts- \
> > --resource-pattern-type prefixed
>
>
> kafka-acls --bootstrap-server localhost:9092 \
> > --add \
> > 

Re: [DISCUSS] KIP-950: Tiered Storage Disablement

2024-05-03 Thread Luke Chen
Also, I think using `stopReplicas` request is a good idea because it won't
cause any problems while migrating to KRaft mode.
The stopReplicas request is one of the request that KRaft controller will
send to ZK brokers during migration.

Thanks.
Luke

On Fri, May 3, 2024 at 11:48 AM Luke Chen  wrote:

> Hi Christo,
>
> Thanks for the update.
>
> Questions:
> 1. For this
> "The possible state transition from DISABLED state is to the ENABLED."
> I think it only applies for KRaft mode. In ZK mode, the possible state is
> "DISABLING", right?
>
> 2. For this:
> "If the cluster is using Zookeeper as the control plane, enabling remote
> storage for a topic triggers the controller to send this information to
> Zookeeper. Each broker listens for changes in Zookeeper, and when a change
> is detected, the broker triggers RemoteLogManager#onLeadershipChange()."
>
> I think the way ZK brokers knows the leadership change is by getting the
> LeaderAndISRRequeset from the controller, not listening for changes in ZK.
>
> 3. In the KRaft handler steps, you said:
> "The controller also updates the Topic metadata to increment the
> tiered_epoch and update the tiered_stateto DISABLING state."
>
> Should it be "DISABLED" state since it's KRaft mode?
>
> 4. I was thinking how we handle the tiered_epoch not match error.
> For ZK, I think the controller won't write any data into ZK Znode,
> For KRaft, either configRecord or updateTopicMetadata records won't be
> written.
> Is that right? Because the current workflow makes me think there will be
> partial data updated in ZK/KRaft when tiered_epoch error.
>
> 5. Since we changed to use stopReplicas (V5) request now, the diagram for
> ZK workflow might also need to update.
>
> 6. In ZK mode, what will the controller do if the "stopReplicas" responses
> not received from all brokers? Reverting the changes?
> This won't happen in KRaft mode because it's broker's responsibility to
> fetch metadata update from controller.
>
>
> Thank you.
> Luke
>
>
> On Fri, Apr 19, 2024 at 10:23 PM Christo Lolov 
> wrote:
>
>> Heya all!
>>
>> I have updated KIP-950. A list of what I have updated is:
>>
>> * Explicitly state that Zookeeper-backed clusters will have ENABLED ->
>> DISABLING -> DISABLED while KRaft-backed clusters will only have ENABLED ->
>> DISABLED
>> * Added two configurations for the new thread pools and explained where
>> values will be picked-up mid Kafka version upgrade
>> * Explained how leftover remote partitions will be scheduled for deletion
>> * Updated the API to use StopReplica V5 rather than a whole new
>> controller-to-broker API
>> * Explained that the disablement procedure will be triggered by the
>> controller listening for an (Incremental)AlterConfig change
>> * Explained that we will first move log start offset and then issue a
>> deletion
>> * Went into more details that changing remote.log.disable.policy after
>> disablement won't do anything and that if a customer would like additional
>> data deleted they would have to use already existing methods
>>
>> Let me know if there are any new comments or I have missed something!
>>
>> Best,
>> Christo
>>
>> On Mon, 15 Apr 2024 at 12:40, Christo Lolov 
>> wrote:
>>
>>> Heya Doguscan,
>>>
>>> I believe that the state of the world after this KIP will be the
>>> following:
>>>
>>> For Zookeeper-backed clusters there will be 3 states: ENABLED, DISABLING
>>> and DISABLED. We want this because Zookeeper-backed clusters will await a
>>> confirmation from the brokers that they have indeed stopped tiered-related
>>> operations on the topic.
>>>
>>> For KRaft-backed clusters there will be only 2 states: ENABLED and
>>> DISABLED. KRaft takes a fire-and-forget approach for topic deletion. I
>>> believe the same approach ought to be taken for tiered topics. The
>>> mechanism which will ensure that leftover state in remote due to failures
>>> is cleaned up to me is the retention mechanism. In today's code, a leader
>>> deletes all segments it finds in remote with offsets below the log start
>>> offset. I believe this will be good enough for cleaning up leftover state
>>> in remote due to failures.
>>>
>>> I know that quite a few changes have been discussed so I will aim to put
>>> them on paper in the upcoming days and let everyone know!
>>>
>>> Best,
>>> Christo
>>>
>>> On Tue, 9 Apr 2024 at 14:49, Doğuşcan Namal 
>>> wrote:
>>>
 +1 let's not introduce a new api and mark it immediately as deprecated
 :)

 On your second comment Luke, one thing we need to clarify is when do we
 consider remote storage to be DISABLED for a topic?
 Particularly, what is the state when the remote storage is being
 deleted in case of disablement.policy=delete? Is it DISABLING or DISABLED?

 If we move directly to the DISABLED state,

 a) in case of failures, the leaders should continue remote storage
 deletion even if the topic is moved to the DISABLED state, otherwise we
 risk having stray data on remote 

[jira] [Resolved] (KAFKA-16572) allow defining number of disks per broker in ClusterTest

2024-05-03 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16572?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16572.

Fix Version/s: 3.8.0
   Resolution: Fixed

> allow defining number of disks per broker in ClusterTest
> 
>
> Key: KAFKA-16572
> URL: https://issues.apache.org/jira/browse/KAFKA-16572
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
> Fix For: 3.8.0
>
>
> This is a follow-up of KAFKA-16559



--
This message was sent by Atlassian Jira
(v8.20.10#820010)