Re: [DISCUSS] KIP-932: Queues for Kafka

2024-05-01 Thread Andrew Schofield
Hi Jun,
Thanks for your reply.

147. Perhaps the easiest is to take a look at the code in
o.a.k.clients.consumer.internal.MembershipManagerImpl.
This class is part of the new consumer group protocol
code in the client. It makes state transitions based on
the heartbeat requests and responses, and it makes a
judgement about whether an assignment received is
equal to what it already is using. When a state transition
is deemed to be the beginning or end of a rebalance
from the point of view of this client, it counts towards the
rebalance metrics.

Share groups will follow the same path.

150. I do not consider it a concern. Rebalancing a share group
is less disruptive than rebalancing a consumer group. If the assignor
Has information about existing assignments, it can use it. It is
true that this information cannot be replayed from a topic and will
sometimes be unknown as a result.

151. I don’t want to rename TopicPartitionsMetadata to
simply TopicPartitions (it’s information about the partitions of
a topic) because we then have an array of plurals.
I’ve renamed Metadata to Info. That’s a bit less cumbersome.

152. Fixed.

153. It’s the GC. Fixed.

154. The UNKNOWN “state” is essentially a default for situations where
the code cannot understand data it received. For example, let’s say that
Kafka 4.0 has groups with states EMPTY, STABLE, DEAD. If Kafka 4.1
introduced another state THINKING, a tool built with Kafka 4.0 would not
know what THINKING meant. It will use “UNKNOWN” to indicate that the
state was something that it could not understand.

155. No, it’s a the level of the share-partition. If the offsets for just
one share-partition is reset, only the state epoch for that partition is
updated.

156. Strictly speaking, it’s redundant. I think having the StartOffset
separate gives helpful clarity and I prefer to retain it.

157. Yes, you are right. There’s no reason why a leader change needs
to force a ShareSnapshot. I’ve added leaderEpoch to the ShareUpdate.

158. Although ReadShareGroupOffsetsState is a bit of a mouthful,
having “State” in the name makes it clear that this one the family of
inter-broker RPCs served by the share coordinator. The admin RPCs
such as DescribeShareGroupOffsets do not include “State”.

159. Fixed.

160. Fixed.

Thanks,
Andrew

> On 2 May 2024, at 00:29, Jun Rao  wrote:
> 
> Hi, Andrew,
> 
> Thanks for the reply.
> 
> 147. "The measurement is certainly from the point of view of the client,
> but it’s driven by sending and receiving heartbeats rather than whether the
> client triggered the rebalance itself."
> Hmm, how does a client know which heartbeat response starts a rebalance?
> 
> 150. PartitionAssignor takes existing assignments into consideration. Since
> GC doesn't persist the assignment for share groups, it means that
> ShareGroupPartitionAssignor can't reliably depend on existing assignments.
> Is that a concern?
> 
> 151. ShareGroupPartitionMetadataValue: Should we rename
> TopicPartitionsMetadata and TopicMetadata since there is no metadata?
> 
> 152. ShareGroupMetadataKey: "versions": "3"
> The versions should be 11.
> 
> 153. ShareGroupDescription.coordinator(): The description says "The share
> group coordinator". Is that the GC or SC?
> 
> 154. "A share group has only three states - EMPTY , STABLE and DEAD".
>  What about UNKNOWN?
> 
> 155. WriteShareGroupState: StateEpoch is at the group level, not partition
> level, right?
> 
> 156. ShareSnapshotValue: Is StartOffset redundant since it's the same as
> the smallest FirstOffset in StateBatches?
> 
> 157. Every leader change forces a ShareSnapshotValue write to persist the
> new leader epoch. Is that a concern? An alternative is to include
> leaderEpoch in ShareUpdateValue.
> 
> 158. ReadShareGroupOffsetsState: The state is the offsets. Should we rename
> it to something like ReadShareGroupStartOffset?
> 
> 159. members are assigned members round-robin => members are assigned
> round-robin
> 
> 160. "may called": typo
> 
> Jun
> 
> On Mon, Apr 29, 2024 at 10:11 AM Andrew Schofield 
> wrote:
> 
>> Hi Jun,
>> Thanks for the reply and sorry for the delay in responding.
>> 
>> 123. Yes, I didn’t quite get your point earlier. The member
>> epoch is bumped by the GC when it sends a new assignment.
>> When the member sends its next heartbeat, it echoes back
>> the member epoch, which will confirm the receipt of the
>> assignment. It would send the same member epoch even
>> after recovery of a network disconnection, so that should
>> be sufficient to cope with this eventuality.
>> 
>> 125. Yes, I have added it to the table which now matches
>> the text earlier in the KIP. Thanks.
>> 
>> 140. Yes, I have added it to the table which now matches
>> the text earlier in the KIP. I’ve also added more detail for
>> the case where the entire share group is being deleted.
>> 
>> 141. Yes! Sorry for confusing things.
>> 
>> Back to the original question for this point. To delete a share
>> group, should the GC write a tombstone 

Re: [DISCUSS] KIP-1027 Add MockFixedKeyProcessorContext

2024-05-01 Thread Matthias J. Sax

Any updates on this KIP?

On 3/28/24 4:11 AM, Matthias J. Sax wrote:
It seems that `MockRecordMetadata` is a private class, and thus not part 
of the public API. If there are any changes required, we don't need to 
discuss on the KIP.



For `CapturedPunctuator` and `CapturedForward` it's a little bit more 
tricky. My gut feeling is, that the classes might not need to be 
changed, but if we use them within `MockProcessorContext` and 
`MockFixedKeyProcessorContext` it might be weird to keep the current 
nesting... The problem I see is, that it's not straightforward how to 
move the classes w/o breaking compatibility, nor if we duplicate them as 
standalone classes w/o a larger "splash radius". (We would need to add 
new overloads for MockProcessorContext#scheduledPunctuators() and 
MockProcessorContext#forwarded()).


Might be good to hear from others if we think it's worth this larger 
changes to get rid of the nesting, or just accept the somewhat not ideal 
nesting as it technically is not a real issue?



-Matthias


On 3/15/24 1:47 AM, Shashwat Pandey wrote:

Thanks for the feedback Matthias!

The reason I proposed the extension of MockProcessorContext was more 
to do
with the internals of the class (MockRecordMetadata, 
CapturedPunctuator and

CapturedForward).

However, I do see your point, I would then think to split
MockProcessorContext and MockFixedKeyProcessorContext, some of the 
internal

classes should also be extracted i.e. MockRecordMetadata,
CapturedPunctuator and probably a new CapturedFixedKeyForward.

Let me know what you think!


Regards,
Shashwat Pandey


On Mon, Mar 11, 2024 at 10:09 PM Matthias J. Sax  
wrote:



Thanks for the KIP Shashwat. Closing this testing gap is great! It did
come up a few time already...

One question: why do you propose to `extend MockProcessorContext`?

Given how the actual runtime context classes are setup, it seems that
the regular context and fixed-key-context are distinct, and thus I
believe both mock-context classes should be distinct, too?

What I mean is that FixedKeyProcessorContext does not extend
ProcessorContext. Both classes have a common parent ProcessINGContext
(note the very similar but different names), but they are "siblings"
only, so why make the mock processor a parent-child relationship?

It seems better to do

public class MockFixedKeyProcessorContext
    implements FixedKeyProcessorContext,
   RecordCollector.Supplier


Of course, if there is code we can share between both mock-context we
should so this, but it should not leak into the public API?


-Matthias



On 3/11/24 5:21 PM, Shashwat Pandey wrote:

Hi everyone,

I would like to start the discussion on


https://cwiki.apache.org/confluence/display/KAFKA/KIP-1027%3A+Add+MockFixedKeyProcessorContext


This adds MockFixedKeyProcessorContext to the Kafka Streams Test Utils
library.

Regards,
Shashwat Pandey







[jira] [Created] (KAFKA-16655) deflake ZKMigrationIntegrationTest.testDualWrite

2024-05-01 Thread Alyssa Huang (Jira)
Alyssa Huang created KAFKA-16655:


 Summary: deflake ZKMigrationIntegrationTest.testDualWrite
 Key: KAFKA-16655
 URL: https://issues.apache.org/jira/browse/KAFKA-16655
 Project: Kafka
  Issue Type: Improvement
Reporter: Alyssa Huang
Assignee: Alyssa Huang


{code:java}
Failed to map supported failure 'org.opentest4j.AssertionFailedError: expected: 
not equal but was: <0>' with mapper 
'org.gradle.api.internal.tasks.testing.failure.mappers.OpenTestAssertionFailedMapper@59b5251d':
 Cannot invoke "Object.getClass()" because "obj" is null

> Task :core:test
kafka.zk.ZkMigrationIntegrationTest.testDualWrite(ClusterInstance)[8] failed, 
log available in 
/Users/ahuang/ce-kafka/core/build/reports/testOutput/kafka.zk.ZkMigrationIntegrationTest.testDualWrite(ClusterInstance)[8].test.stdout

Gradle Test Run :core:test > Gradle Test Executor 8 > 
ZkMigrationIntegrationTest > testDualWrite(ClusterInstance) > testDualWrite [8] 
Type=ZK, MetadataVersion=3.8-IV0, Security=PLAINTEXT FAILED
org.opentest4j.AssertionFailedError: expected: not equal but was: <0>
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:152)
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at 
app//org.junit.jupiter.api.AssertNotEquals.failEqual(AssertNotEquals.java:277)
at 
app//org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:119)
at 
app//org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:111)
at 
app//org.junit.jupiter.api.Assertions.assertNotEquals(Assertions.java:2121)
at 
app//kafka.zk.ZkMigrationIntegrationTest.testDualWrite(ZkMigrationIntegrationTest.scala:995)
 {code}

This test occasionally fails due to stale broker epoch exceptions, which in 
turn causes allocate producer ids to fail.

Also fixes {{sendAllocateProducerIds}} erroneously returning 0 as the 
`producerIdStart` in error cases (because `onComplete` only accounts for 
timeouts and ignores any other error code)


{code:java}
[2024-04-12 18:45:08,820] INFO [ControllerServer id=3000] allocateProducerIds: 
event failed with StaleBrokerEpochException in 19 microseconds. 
(org.apache.kafka.controller.QuorumController:765) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16647) Remove setMetadataDirectory from BrokerNode/ControllerNode

2024-05-01 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16647.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Remove setMetadataDirectory from BrokerNode/ControllerNode
> --
>
> Key: KAFKA-16647
> URL: https://issues.apache.org/jira/browse/KAFKA-16647
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Kuan Po Tseng
>Priority: Minor
> Fix For: 3.8.0
>
>
> `TestKitNodes` does not enable callers to define the location of "base 
> folder". That makes sense to me since callers should not care for it. That 
> means the location of metadata folder shoud be transparent to callers. Hence, 
> the setter of metadata folder is useless.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16614) Disallow `@ClusterTemplate("")`

2024-05-01 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16614?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16614.

Fix Version/s: 3.8.0
   Resolution: Fixed

> Disallow `@ClusterTemplate("")`
> ---
>
> Key: KAFKA-16614
> URL: https://issues.apache.org/jira/browse/KAFKA-16614
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TaiJuWu
>Priority: Minor
> Fix For: 3.8.0
>
>
> `@ClusterTemplate` enable us to create dynamic configs, and it expect to 
> accept a method name which can create server configs at runtime. It throws 
> error when we pass a nonexistent method name, but it works if we pass an 
> empty name



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16654) Refactor kafka.test.annotation.Type and ClusterTestExtensions

2024-05-01 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16654:
--

 Summary: Refactor kafka.test.annotation.Type and 
ClusterTestExtensions
 Key: KAFKA-16654
 URL: https://issues.apache.org/jira/browse/KAFKA-16654
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


It seems to me the refactor could include following tasks.

1. change `invocationContexts`, method invoked by `ClusterTemplate`, and 
generate-related methods in `ClusterTestExtensions` to return a 
java.util.Collection instead of accepting a `java.util.function.Consumer`. That 
can brings two benefit. 1) more simple in production: we don't need to create a 
List and then pass it to be a function to collect stuff. 2)  more easy to write 
unit test.

2. separate `provideTestTemplateInvocationContexts` to multi methods to handle 
each annotation. That can help us to write tests, and make core more readable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16653) Remove delayed initialization because of static voter set

2024-05-01 Thread Jira
José Armando García Sancio created KAFKA-16653:
--

 Summary: Remove delayed initialization because of static voter set
 Key: KAFKA-16653
 URL: https://issues.apache.org/jira/browse/KAFKA-16653
 Project: Kafka
  Issue Type: Sub-task
  Components: kraft
Reporter: José Armando García Sancio


Once KRaft supports the AddVoter RPC, the QuorumTestHarness and 
KRaftClusterTestKit can be reimplemented to use dynamic voters instead of the 
static voter set.

This should allow us to remove KRaft's support for delay static voter set 
initialization.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: DISCUSS KIP-984 Add pluggable compression interface to Kafka

2024-05-01 Thread Greg Harris
Hi Assane,

Thanks for the update. Unfortunately, I don't think that the design
changes have solved all of the previous concerns, and I feel it has
raised new ones.

>From my earlier email:
1. The KIP has now included Python, but this feature is still
disproportionately difficult for statically-linked languages to
support.
2. This is unaddressed.
3. This is unaddressed.
4. The KIP now includes a metadata topic that is used to persist a
mapping from the binary ID to full class name, but requires the
operator to manage this mapping.

My new concerns are:
5. It is not possible to interpret a single message without also
reading from this additional metadata (messages are not self
contained)
6. There are a finite number of pluggable IDs, and this finite number
is baked into the protocol.
6a. This is a problem with the existing binary protocol, but this is
acceptable as the frequency that a new protocol is added is quite low,
and is discussed with the community.
6b. Someone experimenting with compression plugins could easily
exhaust this limit in a single day, and the limit is exhausted for the
lifetime of the cluster. This could be done accidentally or
maliciously.
6c. Consuming 4 of the remaining 8 reserved bits feels wasteful,
compared to the benefit that the protocol is receiving from this
feature.
7. Implementing support for this feature would require distributing
and caching the metadata, which is a significant increase in
complexity compared to the current compression mechanisms.

>From your motivation section:
> Although compression is not a new problem, it has continued to be an 
> important research topic.
> The integration and testing of new compression algorithms into Kafka 
> currently requires significant code changes and rebuilding of the 
> distribution package for Kafka.

I think it is completely appropriate for someone testing an
experimental compression algorithm to temporarily fork Kafka, and then
discard that fork and all of the compressed data when the experiment
is over.
The project has to balance the experience of upstream developers
(including compression researchers), ecosystem developers, and
operators, and this proposal's cost to ecosystem developers and
operators is too high to justify the benefits.

As an alternative, have you considered implementing a custom
Serializer/Deserializer that could implement this feature, and just
leave the Kafka compression off?
I think an "Add Brotli Compression" KIP is definitely worth pursuing,
if that is the compression algorithm you have in mind currently.

Thanks,
Greg


On Mon, Apr 29, 2024 at 3:10 PM Diop, Assane  wrote:
>
> Hi Divij, Greg and Luke,
> I have updated the KIP for Kafka pluggable compression addressing the 
> concerns from the original design.
> I believe this new design takes into account lots of concerns and have solved 
> them. I would like to receive feedback on them as I am working on getting 
> this KIP accepted. Not targeting a release or anything but accepting the 
> concept will help getting towards this direction.
>
> The link to the KIP is here 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-984%3A+Add+pluggable+compression+interface+to+Kafka
>
> Assane
>
> -Original Message-
> From: Diop, Assane 
> Sent: Wednesday, April 24, 2024 4:58 PM
> To: dev@kafka.apache.org
> Subject: RE:DISCUSS KIP-984 Add pluggable compression interface to Kafka
>
> Hi,
>
> I would like to bring back attention to 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-984%3A+Add+pluggable+compression+interface+to+Kafka
> I have made significant changes to the design to accommodate the concerns and 
> would like some feedback from the community and engage communication.
>
> Assane
>
> -Original Message-
> From: Diop, Assane
> Sent: Friday, March 1, 2024 4:45 PM
> To: dev@kafka.apache.org
> Subject: RE: DISCUSS KIP-984 Add pluggable compression interface to Kafka
>
> Hi Luke,
>
> The proposal doesn't preclude supporting multiple clients but each client 
> would need an implementation of the pluggable architecture.
> At the very least we envision other clients such as librdkafka and 
> kafka-python could be supported by C implementations.
>
> We agree with community feedback regarding the need to support these clients, 
> and we are looking at alternative approaches for brokers and clients to 
> coordinate the plugin.
>
> One way to do this coordination is each client should have a configuration 
> mapping of the plugin name to its implementation.
>
> Assane
>
>
>
>
>
>
> -Original Message-
> From: Luke Chen 
> Sent: Monday, February 26, 2024 7:50 PM
> To: dev@kafka.apache.org
> Subject: Re: DISCUSS KIP-984 Add pluggable compression interface to Kafka
>
> Hi Assane,
>
> I also share the same concern as Greg has, which is that the KIP is not kafka 
> ecosystem friendly.
> And this will make the kafka client and broker have high dependencies that 
> once you use the pluggable compression interface, the producer 

Re: [DISCUSS] KIP-932: Queues for Kafka

2024-05-01 Thread Jun Rao
Hi, Andrew,

Thanks for the reply.

147. "The measurement is certainly from the point of view of the client,
but it’s driven by sending and receiving heartbeats rather than whether the
client triggered the rebalance itself."
Hmm, how does a client know which heartbeat response starts a rebalance?

150. PartitionAssignor takes existing assignments into consideration. Since
GC doesn't persist the assignment for share groups, it means that
ShareGroupPartitionAssignor can't reliably depend on existing assignments.
Is that a concern?

151. ShareGroupPartitionMetadataValue: Should we rename
TopicPartitionsMetadata and TopicMetadata since there is no metadata?

152. ShareGroupMetadataKey: "versions": "3"
 The versions should be 11.

153. ShareGroupDescription.coordinator(): The description says "The share
group coordinator". Is that the GC or SC?

154. "A share group has only three states - EMPTY , STABLE and DEAD".
  What about UNKNOWN?

155. WriteShareGroupState: StateEpoch is at the group level, not partition
level, right?

156. ShareSnapshotValue: Is StartOffset redundant since it's the same as
the smallest FirstOffset in StateBatches?

157. Every leader change forces a ShareSnapshotValue write to persist the
new leader epoch. Is that a concern? An alternative is to include
leaderEpoch in ShareUpdateValue.

158. ReadShareGroupOffsetsState: The state is the offsets. Should we rename
it to something like ReadShareGroupStartOffset?

159. members are assigned members round-robin => members are assigned
round-robin

160. "may called": typo

Jun

On Mon, Apr 29, 2024 at 10:11 AM Andrew Schofield 
wrote:

> Hi Jun,
> Thanks for the reply and sorry for the delay in responding.
>
> 123. Yes, I didn’t quite get your point earlier. The member
> epoch is bumped by the GC when it sends a new assignment.
> When the member sends its next heartbeat, it echoes back
> the member epoch, which will confirm the receipt of the
> assignment. It would send the same member epoch even
> after recovery of a network disconnection, so that should
> be sufficient to cope with this eventuality.
>
> 125. Yes, I have added it to the table which now matches
> the text earlier in the KIP. Thanks.
>
> 140. Yes, I have added it to the table which now matches
> the text earlier in the KIP. I’ve also added more detail for
> the case where the entire share group is being deleted.
>
> 141. Yes! Sorry for confusing things.
>
> Back to the original question for this point. To delete a share
> group, should the GC write a tombstone for each
> ShareGroupMemberMetadata record?
>
> Tombstones are necessary to delete ShareGroupMemberMetadata
> records. But, deletion of a share group is only possible when
> the group is already empty, so the tombstones will have
> been written as a result of the members leaving the group.
>
> 143. Yes, that’s right.
>
> 147. The measurement is certainly from the point of view
> of the client, but it’s driven by sending and receiving heartbeats
> rather than whether the client triggered the rebalance itself.
> The client decides when it enters and leaves reconciliation
> of the assignment, and measures this period.
>
>
> Thanks,
> Andrew
>
>
> > On 26 Apr 2024, at 09:43, Jun Rao  wrote:
> >
> > Hi, Andrew,
> >
> > Thanks for the reply.
> >
> > 123. "Rather than add the group epoch to the ShareGroupHeartbeat, I have
> > decided to go for TopicPartitions in ShareGroupHeartbeatRequest which
> > mirrors ConsumerGroupHeartbeatRequest."
> > ShareGroupHeartbeat.MemberEpoch is the group epoch, right? Is that enough
> > for confirming the receipt of the new assignment?
> >
> > 125. This also means that "Alter share group offsets" needs to write a
> > ShareGroupPartitionMetadata record, if the partition is not already
> > initialized.
> >
> > 140. In the table for "Delete share group offsets", we need to add a step
> > to write a ShareGroupPartitionMetadata record with DeletingTopics.
> >
> > 141. Hmm, ShareGroupMemberMetadata is stored in the __consumer_offsets
> > topic, which is a compacted topic, right?
> >
> > 143. So, the client sends DescribeShareGroupOffsets requests to GC, which
> > then forwards it to SC?
> >
> > 147. I guess a client only knows the rebalance triggered by itself, but
> not
> > the ones triggered by other members or topic/partition changes?
> >
> > Jun
> >
> > On Thu, Apr 25, 2024 at 4:19 AM Andrew Schofield <
> andrew_schofi...@live.com>
> > wrote:
> >
> >> Hi Jun,
> >> Thanks for the response.
> >>
> >> 123. Of course, ShareGroupHearbeat started off as ConsumerGroupHeartbeat
> >> and then unnecessary fields were removed. In the network issue case,
> >> there is not currently enough state being exchanged to be sure an
> >> assignment
> >> was received.
> >>
> >> Rather than add the group epoch to the ShareGroupHeartbeat, I have
> decided
> >> to go for TopicPartitions in ShareGroupHeartbeatRequest which mirrors
> >> ConsumerGroupHeartbeatRequest. It means the share group member does
> >> confirm the 

Re: [VOTE] KIP-477: Add PATCH method for connector config in Connect REST API

2024-05-01 Thread Greg Harris
Hi Ivan,

Thank you for the KIP!
I think PATCH using the same return type as PUT benefits the clients,
even though the "created" field will always be false.

+1 (binding)

Thanks,
Greg

On Fri, Apr 12, 2024 at 4:31 AM Yash Mayya  wrote:
>
> Hi Ivan,
>
> Thanks for reviving this KIP, I think it will be a useful addition to
> Connect!
>
> +1 (binding)
>
> Cheers,
> Yash
>
> On Tue, Apr 9, 2024 at 4:23 AM Knowles Atchison Jr 
> wrote:
>
> > +1 (non binding)
> >
> > On Mon, Apr 8, 2024, 3:30 PM Chris Egerton 
> > wrote:
> >
> > > Thanks Ivan! +1 (binding) from me.
> > >
> > > On Mon, Apr 8, 2024, 06:59 Ivan Yurchenko  wrote:
> > >
> > > > Hello!
> > > >
> > > > I'd like to put the subj KIP[1] to a vote. Thank you.
> > > >
> > > > Best regards,
> > > > Ivan
> > > >
> > > > [1]
> > > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-477%3A+Add+PATCH+method+for+connector+config+in+Connect+REST+API
> > > >
> > >
> >


Re: [DISCUSS] KIP-924: customizable task assignment for Streams

2024-05-01 Thread Guozhang Wang
Jumping back to the party here :)

107: I agree with the rationale behind this, and
`numProcessingThreads` looks good to me as it covers both the current
and future scenarios.

117: I agree with Lucas and Bruno, and would add:
  * 117e: unknown taskID: fail
  * 117f: inconsistent task types (e.g. a known taskID was indicated
stateless from ApplicationState, but the returned AssignedTask states
stateful): fail
  * 117g: some ProcessID was not included in the returned Set: pass,
and interprets it as no tasks assigned to it.

And I'm open for any creative error codes folks would come up with :)

> If any of these errors are detected, the StreamsPartitionAssignor will 
> immediately "fail" the rebalance and retry it by scheduling an immediate 
> followup rebalance.

I'm also a bit concerned here, as such endless retry loops have
happened in the past in my memory. Given that we would likely see most
of the user implementations be deterministic, I'm also leaning towards
failing the app immediately and let the crowd educates us if there are
some very interesting scenarios out there that are not on our radar to
re-consider this, rather than getting hard to debug cases in the dark.

-

And here are just some nits about the KIP writings itself:

* I think some bullet points under `User APIs` and `Read-only APIs`
should have a lower level indention? It caught me for a sec until I
realized there are just two categories.

* In TaskAssignmentUtils , why not let those util functions return
`TaskAssignment` (to me it feels more consistent with the user APIs),
but instead return a Map?


Guozhang

On Tue, Apr 30, 2024 at 5:28 PM Matthias J. Sax  wrote:
>
> I like the idea of error codes. Not sure if the name are ideal?
> UNKNOWN_PROCESS_ID makes sense, but the other two seems a little bit
> difficult to understand?
>
> Should we be very descriptive (and also try to avoid coupling it to the
> threading model -- important for the first error code):
>   - ACTIVE_TASK_ ASSIGNED_MULTIPLE_TIMES
>   - ACTIVE_AND_STANDBY_ASSIGNED_TO_SAME_CLIENT (or _INSTANCE
>
> I think we also need to add NONE as option or make the error parameter
> an `Optional`?
>
>
> > OVERLAPPING_CLIENT : multiple KafkaStreams clients assigned with the same 
> > active task
>
> Would also be an error if assigned to two consumers of the same
> client... Needs to be rephrased.
>
>
>
> > If any of these errors are detected, the StreamsPartitionAssignor will 
> > immediately "fail" the rebalance and retry it by scheduling an immediate 
> > followup rebalance.
>
> Does this make sense? If we assume that the task-assignment is
> deterministic, we would end up with an infinite retry loop? Also,
> assuming that an client leave the group, we cannot assign some task any
> longer... I would rather throw a StreamsException and let the client crash.
>
>
>
> -Matthias
>
> On 4/30/24 12:22 PM, Sophie Blee-Goldman wrote:
> > One last thing: I added an error code enum to be returned from the
> > #onAssignmentComputed method in case of an invalid assignment. I created
> > one code for each of the invalid cases we described above. The downside is
> > that this means we'll have to go through a deprecation cycle if we want to
> > loosen up the restrictions on any of the enforced cases. The upside is that
> > we can very clearly mark what is an invalid assignment and this will
> > (hopefully) assist users who are new to customizing assignments by clearly
> > denoting the requirements, and returning a clear error if they are not
> > followed.
> >
> > Of course the StreamsPartitionAssignor will also do a "fallback & retry" in
> > this case by returning the same assignment to the consumers and scheduling
> > a followup rebalance. I've added all of this to the TaskAssignor  and
> > #onAssignmentComputed javadocs, and added a section under "Public Changes"
> > as well.
> >
> > Please let me know if there are any concerns, or if you have suggestions
> > for how else we can handle an invalid assignment
> >
> > On Tue, Apr 30, 2024 at 11:39 AM Sophie Blee-Goldman 
> > wrote:
> >
> >> Thanks guys! I agree with what Lucas said about 117c, we can always loosen
> >> a restriction later and I don't want to do anything now that might get in
> >> the way of the new threading models.
> >>
> >> With that I think we're all in agreement on 117. I'll update the KIP to
> >> include what we've discussed
> >>
> >> (and will fix the remaining #finalAssignment mention as well, thanks
> >> Bruno. Glad to have such good proof readers! :P)
> >>
> >> On Tue, Apr 30, 2024 at 8:35 AM Bruno Cadonna  wrote:
> >>
> >>> Hi again,
> >>>
> >>> I forgot to ask whether you could add the agreement about handling
> >>> invalid assignment to the KIP.
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On 4/30/24 2:00 PM, Bruno Cadonna wrote:
>  Hi all,
> 
>  I think we are converging!
> 
>  117
>  a) fail: Since it is an invalid consumer assignment
>  b) pass: I agree that not assigning a task might 

Re: [ANNOUNCE] New committer: Igor Soarez

2024-05-01 Thread Guozhang Wang
Congratulations!

On Tue, Apr 30, 2024 at 2:49 AM ziming deng  wrote:
>
> Congrats Igor!
>
> > On Apr 26, 2024, at 13:53, Christo Lolov  wrote:
> >
> > Congratulations Igor :) !
> >
> > On Thu, 25 Apr 2024 at 17:07, Igor Soarez  wrote:
> >
> >> Thanks everyone, I'm very honoured to join!
> >>
> >> --
> >> Igor
> >>
>


Re: [DISCUSS] KIP-1033: Add Kafka Streams exception handler for exceptions occuring during processing

2024-05-01 Thread Lianet M.
Hi all, thanks Damien for the KIP!

After looking into the KIP and comments, my only concern is aligned with
one of Matthias comments, around the ImmutableHeaders introduction, with
the motivation not being clear enough. The existing handlers already expose
the headers (indirectly). Ex.
ProductionExceptionHandler.handleSerializationException provides the
ProducerRecord as an argument, so they are already exposed in those
callbacks through record.headers(). Is there a reason to think that it
would be a problem to expose the headers in the
new ProcessingExceptionHandler, but that it's not a problem for the
existing handler?

If there is no real concern about the KS engine requiring those headers, it
feels hard to mentally justify the complexity we transfer to the user by
exposing a new concept into the callbacks to represent the headers. In the
end, it strays aways from the simple/consistent representation of Headers
used all over. Even if eventually the KS engine needs to use the headers
after the callbacks with certainty that they were not altered, still feels
like it's something we could attempt to solve internally, without having to
transfer "new concepts" into the user (ex. the deep-copy as it was
suggested, seems like the kind of trade-off that would maybe be acceptable
here to gain simplicity and consistency among the handlers with a single
existing representation of Headers).

Best!

Lianet



On Tue, Apr 30, 2024 at 9:36 PM Matthias J. Sax  wrote:

> Thanks for the update.
>
> I am wondering if we should use `ReadOnlyHeaders` instead of
> `ImmutableHeaders` as interface name?
>
> Also, the returned `Header` interface is technically not immutable
> either, because `Header#key()` returns a mutable byte-array... Would we
> need a `ReadOnlyHeader` interface?
>
> If yes, it seems that `ReadOnlyHeaders` should not be a super-interface
> of `Headers` but it would rather be a standalone interface, and a
> wrapper for a `Headers` instance? And `ReadOnlyHeader` would return some
> immutable type instead of `byte[]` for the value()?
>
> An alternative would be to deep-copy the value byte-array what would not
> be free, but given that we are talking about exception handling, it
> would not be on the hot code path, and thus might be acceptable?
>
>
> The above seems to increase the complexity significantly though. Hence,
> I have seconds thoughts on the immutability question:
>
> Do we really need to worry about mutability after all, because in the
> end, KS runtime won't read the Headers instance after the handler was
> called, and if a user modifies the passed in headers, there won't be any
> actual damage (ie, no side effects)? For this case, it might even be ok
> to also not add `ImmutableHeaders` to begin with?
>
>
>
> Sorry for the forth and back (yes, forth and back, because back and
> forth does not make sense -- it's not logical -- just trying to fix
> English :D) as I did bring up the immutability question in the first
> place...
>
>
>
> -Matthias
>
> On 4/25/24 5:56 AM, Loic Greffier wrote:
> > Hi Matthias,
> >
> > I have updated the KIP regarding points 103 and 108.
> >
> > 103.
> > I have suggested a new `ImmutableHeaders` interface to deal with the
> > immutability concern of the headers, which is basically the `Headers`
> > interface without the write accesses.
> >
> > public interface ImmutableHeaders {
> >  Header lastHeader(String key);
> >  Iterable headers(String key);
> >  Header[] toArray();
> > }
> >
> > The `Headers` interface can be updated accordingly:
> >
> > public interface Headers extends ImmutableHeaders, Iterable {
> >  //…
> > }
> >
> > Loïc
>


Re: [DISCUSS] KIP-936 Throttle number of active PIDs

2024-05-01 Thread Igor Soarez
Hi Omnia, Hi Claude,

Thanks for putting this KIP together.
This is an important unresolved issue in Kafka,
which I have witnessed several times in production.

Please see my questions below:

10 Given the goal is to prevent OOMs, do we also need to
limit the number of KafkaPrincipals in use?

11. How would an operator know or decide to change the configuration
for the number layers – producer.id.quota.cache.layer.count –
e.g. increasing from 4 to 5; and why?
Do we need a new metric to indicate that change could be useful?

12. Is producer.id.quota.cache.cleanup.scheduler.interval.ms a
guaranteed interval, or rather simply a delay between cleanups?
How did you decide on the default value of 10ms?

13. Under "New ProducerIdQuotaManagerCache", the documentation for
the constructor params for ProducerIDQuotaManagerCache does not
match the constructor signature.

14. Under "New ProducerIdQuotaManagerCache":
  public boolean track(KafkaPrincipal principal, int producerIdRate, long pid)
How is producerIdRate used? The reference implementation Claude shared
does not use it.
https://github.com/Claudenw/kafka/blob/49b6eb0fb5cfaf19b072fd87986072a683ab976c/storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerIDQuotaManager.java

15. I could not find a description or definition for
TimestampedBloomFilter, could we add that to the KIP?

16. LayeredBloomFilter will have a fixed size (right?), but some
users (KafkaPrincipal) might only use a small number of PIDs.
It it worth having a dual strategy, where we simply keep a Set of
PIDs until we reach certain size where it pays off to use
the LayeredBloomFilter?

17. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID requests",
the KIP states:

  a. INIT_PRODUCER_ID for idempotent producer request PIDs from
  random controller every time so if a client got throttled on
  one controller doesn't guarantee it will not go through on next
  controller causing OOM at the leader later.

Is the INIT_PRODUCER_ID request really sent to a "random controller"?
>From a quick look at Sender.maybeSendAndPollTransactionalRequest,
for an idempotent producer, targetNode is set to the broker with
fewest outstanding requests. Am I looking at the wrong place?

18. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID requests",
the KIP states:

  This solution might look simple however throttling the INIT_PRODUCER_ID
  doesn't guarantee the OOM wouldn't happened as
  (...)
  b. The problem happened on the activation of the PID when it
  produce and not at the initialisation. Which means Kafka wouldn't
  have OOM problem if the producer got assigned PID but crashed before
  producing anything.

Point b. does not seem to support the claim above?

19. Under "Rejected Alternatives" > "4. Throttle INIT_PRODUCER_ID requests",
the KIP states:

  c. Throttling producers that crash between initialisation and
  producing could slow them down when they recover/fix the
  problem that caused them to crash right after initialising PID. 

Doesn't it depend on the back-off time or how quotas are enforced?
I’m not sure this would necessarily be a problem?

20. If the allocation of PIDs for idempotent producers was
centralized, or otherwise the the targetNode for that request
was predictable, would that make throttling INIT_PRODUCER_ID
a viable solution?


Best,

--
Igor




Re: [DISCUSS] solutions for broker OOM caused by many producer IDs

2024-05-01 Thread Omnia Ibrahim
Hi Luke and Justine. There are few updates on KIP-936  
https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs
 to introduce throttling on PIDs per User and would love to hear your feedback 
in the discussion thread  
https://lists.apache.org/thread/nxp395zmvc0s8r4ohg91kdb19dxsbxlt if you have 
time. 

Thanks 
Omnia

> On 6 Jun 2023, at 15:04, Omnia Ibrahim  wrote:
> 
> Thanks, Luke for the feedback
> 
>> 1. how do we store value in bloom filter? It's unclear from this KIP that 
>> what we store inside bloom filter, and how we throttle them.
>> My understanding is, we have a map with key = kafkaPrinciple, and value = 
>> PID for each bloom filter.
>> And when new PID created for a userA, we update the map to add PID into the 
>> cache value (i.e. the bloom filter)
>> When the window passed half of the time, we created another bloom filter, 
>> and this time, when new PID comes, we check if this new PID existed in 
>> previous bloom filter, if not, we add into the new bloom filter. And in the 
>> meantime, we track the "new created" count (filtered by previous bloom 
>> filter) for throttling the users.
>> Is my understanding correct?
> 
> Not quite what am proposing. I'm proposing 
> a cache layer to be used only for checking if we encountered the PID before 
> or not for a given KafkaPrincipal. Not as a counter.
> if the cache layer doesn't contain the PIDs I'll increment the metric sensor 
> using Sensor::record (the sensor will be created during the initial 
> interaction with this KafkaPrincipal). Sensor::record fails with 
> QuotaViolationException when we reach the max of the sensor.
> If incrementing the sensor didn't fail with QuotaViolationException then I'll 
> add the PID to the cache for the next time
> To achieve this I'm proposing this the cache layer will be represented as a 
> "cachedMap = Map>". I 
> wrapped "Map>" into 
> "TimedControlledBloomFilter" where we decide which bloom filter to write to, 
> which bloomfilter to delete, etc.
> 
> When we encounter the producer for the first time,
> the new quota manager will create the sensor and update its value.
> Then it will update the cache with this PID for the next time.
> The cache will create an entry for the user in the cachedMap
> The cache will be like this
> Map { "UserA" -> TimedBloomFilter {
>bloom_filter_1_create_timestamp -> 
> bloom_filter_1
>  }
> }
> the PID will be added to bloom_filter_1
> 2. If the producer tries to produce with the same PID the next time; the 
> quota manager will not update the sensor or the cache. And will not throttle 
> the user.
> 3. However, if the producer tries to produce with a new PID, it will be added 
> to bloom_filter_1 as long as we are within the first half of 
> producer.id.quota.window.size.seconds
> 4. If the producer sends any new PIDs after the first half of 
> producer.id.quota.window.size.seconds, we will create a new bloom filter to 
> store the PIDs from the next half of the window
> The cache will be like this
> Map { "UserA" -> TimedBloomFilter {
>bloom_filter_1_create_timestamp -> 
> bloom_filter_1
>bloom_filter_2_create_timestamp -> 
> bloom_filter_2
>  }
> }
> All PIDs from this point until the end of this window will be added to 
> bloom_filter_2.
> And both bloom_filter_1 and bloom_filter_2 will be used for checking if we 
> came across PID before or not.
> 5. a scheduler will run in the background to delete any bloom filter with 
> create_timestamp >= producer.id.quota.window.size.seconds from 
> TimedBloomFilter automatically
> 6. If the user stopped producing and all its bloom filters got deleted by the 
> scheduler the user entry will be removed from the cachedMap.
> 
> I updated the KIP to add more clarification to this caching layer.
> 
>> 2. what will user get when they can't allocate new PID due to throttling?
> 
> 
> The client will get `QuotaViolationException` similar to  ClientQuotaManager 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-936:+Throttle+number+of+active+PIDs#KIP936:ThrottlenumberofactivePIDs-ClientErrors
> 
>> 3. This config: producer.id.quota.window.num is unclear to me?
> 
> `quota.window.num` is a standard config between all Kafka quota types. I am 
> re-using the same description and default value in the existing Kafka 
> codebase (am not a fan of the description as it's not clear). The sample here 
> is referring to the sliding time window. Kafka Quotas keeps 10 in memory + 
> the current window so in total they are 11.
> 
> > Finally, I think this KIP is good to have an official KIP discuss thread 
> > for community review.
> I will open the official KIP discussion today.
> 
> Thanks
> Omnia
> 
> On Tue, Jun 6, 2023 at 10:19 AM Luke Chen  > wrote:
>> Hi Omnia,
>> 
>> I finally got time to check this KIP.
>> 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #2862

2024-05-01 Thread Apache Jenkins Server
See 




[jira] [Resolved] (KAFKA-16588) broker shutdown hangs when `log.segment.delete.delay.ms` is zero

2024-05-01 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16588?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-16588.

Fix Version/s: 3.8.0
   Resolution: Fixed

> broker shutdown hangs when `log.segment.delete.delay.ms` is zero 
> -
>
> Key: KAFKA-16588
> URL: https://issues.apache.org/jira/browse/KAFKA-16588
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
> Fix For: 3.8.0
>
>
> see 
> [https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/core/src/main/scala/kafka/log/LogManager.scala#L1154]
> If `log.segment.delete.delay.ms` is zero, We call `take` even though the 
> `logsToBeDeleted` is empty, and `KafkaScheduler#shutdown` call `shutdown` 
> rather than `shudownNow` 
> ([https://github.com/apache/kafka/blob/trunk/server-common/src/main/java/org/apache/kafka/server/util/KafkaScheduler.java#L134)]
> Hence, the thread won't be completed forever, and it blocks the shutdown of 
> broker.
> We should replace the `take` by `poll` since we have checked the element 
> before.
> BTW, the zero is allowed 
> ([https://github.com/apache/kafka/blob/f22ad6645bfec0b38e820e0090261c9f6b421a74/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java#L258])



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16652) add unit test for

2024-05-01 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-16652:
--

 Summary: add unit test for 
 Key: KAFKA-16652
 URL: https://issues.apache.org/jira/browse/KAFKA-16652
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


https://github.com/apache/kafka/blob/31355ef8f948f369e240ebc203f889f187116d75/core/src/test/java/kafka/test/junit/ClusterTestExtensions.java#L94

If `ClusterTemplate`does not generate any `ClusterConfig`, we will throw 
exception. However, we don't have UT for such scenario currently.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16651) KafkaProducer.send does not throw TimeoutException as documented

2024-05-01 Thread Mike Pedersen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mike Pedersen resolved KAFKA-16651.
---
Resolution: Duplicate

> KafkaProducer.send does not throw TimeoutException as documented
> 
>
> Key: KAFKA-16651
> URL: https://issues.apache.org/jira/browse/KAFKA-16651
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.6.2
>Reporter: Mike Pedersen
>Priority: Major
>
> In the JavaDoc for {{{}KafkaProducer#send(ProducerRecord, Callback){}}}, it 
> claims that it will throw a {{TimeoutException}} if blocking on fetching 
> metadata or allocating memory and surpassing {{{}max.block.ms{}}}.
> {quote}Throws:
> {{TimeoutException}} - If the time taken for fetching metadata or allocating 
> memory for the record has surpassed max.block.ms.
> {quote}
> ([link|https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord,org.apache.kafka.clients.producer.Callback)])
> But this is not the case. As {{TimeoutException}} is an {{ApiException}} it 
> will hit [this 
> catch|https://github.com/a0x8o/kafka/blob/54eff6af115ee647f60129f2ce6a044cb17215d0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1073-L1084]
>  which will result in a failed future being returned instead of the exception 
> being thrown.
> The "allocating memory" part likely changed as part of 
> [KAFKA-3720|https://github.com/apache/kafka/pull/8399/files#diff-43491ffa1e0f8d28db071d8c23f1a76b54f1f20ea98cf6921bfd1c77a90446abR29]
>  which changed the base exception for buffer exhaustion exceptions to 
> {{{}TimeoutException{}}}. Timing out waiting on metadata suffers the same 
> issue, but it is not clear whether this has always been the case.
> This is basically a discrepancy between documentation and behavior, so it's a 
> question of which one should be adjusted.
> And on that, being able to differentiate between synchronous timeouts (as 
> caused by waiting on metadata or allocating memory) and asynchronous timeouts 
> (eg. timing out waiting for acks) is useful. In the former case we _know_ 
> that the broker has not received the event but in the latter it _may_ be that 
> the broker has received it but the ack could not be delivered, and our 
> actions might vary because of this. The current behavior makes this hard to 
> differentiate since both result in a {{TimeoutException}} being delivered via 
> the callback. Currently, I am relying on the exception message string to 
> differentiate these two, but this is basically just relying on implementation 
> detail that may change at any time. Therefore I would suggest to either:
>  * Revert to the documented behavior of throwing in case of synchronous 
> timeouts
>  * Correct the javadoc and introduce an exception base class/interface for 
> synchronous timeouts



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16651) KafkaProducer.send does not throw TimeoutException as documented

2024-05-01 Thread Mike Pedersen (Jira)
Mike Pedersen created KAFKA-16651:
-

 Summary: KafkaProducer.send does not throw TimeoutException as 
documented
 Key: KAFKA-16651
 URL: https://issues.apache.org/jira/browse/KAFKA-16651
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 3.6.2
Reporter: Mike Pedersen


In the JavaDoc for {{KafkaProducer#send(ProducerRecord, Callback)}}, it claims 
that it will throw a {{TimeoutException}} if blocking on fetching metadata or 
allocating memory and surpassing {{max.block.ms}}.

bq. Throws:
bq. {{TimeoutException}} - If the time taken for fetching metadata or 
allocating memory for the record has surpassed max.block.ms.

([link|https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord,org.apache.kafka.clients.producer.Callback)])

But this is not the case. As {{TimeoutException}} is an {{ApiException}} it 
will hit [this 
catch|https://github.com/a0x8o/kafka/blob/54eff6af115ee647f60129f2ce6a044cb17215d0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1073-L1084]
 which will result in a failed future being returned instead of the exception 
being thrown.

The "allocating memory" part likely changed as part of 
[KAFKA-3720|https://github.com/apache/kafka/pull/8399/files#diff-43491ffa1e0f8d28db071d8c23f1a76b54f1f20ea98cf6921bfd1c77a90446abR29]
 which changed the base exception for buffer exhaustion exceptions to 
{{TimeoutException}}. Timing out waiting on metadata suffers the same issue, 
but it is not clear whether this has always been the case.

This is basically a discrepancy between documentation and behavior, so it's a 
question of which one should be adjusted.

And on that, being able to differentiate between synchronous timeouts (as 
caused by waiting on metadata or allocating memory) and asynchronous timeouts 
(eg. timing out waiting for acks) is useful. In the former case we _know_ that 
the broker has not received the event but in the latter it _may_ be that the 
broker has received it but the ack could not be delivered, and our actions 
might vary because of this. The current behavior makes this hard to 
differentiate since both result in a {{TimeoutException}} being delivered via 
the callback. Currently, we are relying on the exception message, but this is 
basically just relying on implementation detail that may change at any time. 
Therefore I would suggest to either:

* Revert to the documented behavior of throwing in case of synchronous timeouts
* Correct the javadoc and introduce an exception base class/interface for 
synchronous timeouts



--
This message was sent by Atlassian Jira
(v8.20.10#820010)