[jira] [Resolved] (KAFKA-17186) Cannot receive message after stopping Source Mirror Maker 2

2024-07-29 Thread George Yang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

George Yang resolved KAFKA-17186.
-
Resolution: Not A Problem

configuration issues, fixed

> Cannot receive message after stopping Source Mirror Maker 2
> ---
>
> Key: KAFKA-17186
> URL: https://issues.apache.org/jira/browse/KAFKA-17186
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.1
> Environment: Source Kafka Cluster per Node:
> CPU(s): 32
> Memory: 32G/1.1G free
> Target Kafka Cluster standalone Node:
> CPU(s): 24
> Memory: 30G/909M free
> Kafka Version 3.7
> Mirrormaker Version 3.7.1
>Reporter: George Yang
>Priority: Major
> Attachments: image-2024-07-25-14-14-21-327.png, mirrorMaker.out
>
>
> Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node 
> 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. 
> Currently, a service on node 1 in Data Center A acts as a producer sending 
> messages to the `myTest` topic. A service in Data Center B acts as a consumer 
> listening to `A.myTest`. 
> The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer 
> in Data Center B ceases to receive messages. Even after I restarting MM2 in 
> Data Center A, the consumer in Data Center B still does not receive messages 
> until approximately 5 minutes later when a rebalance occurs, at which point 
> it begins receiving messages again.
>  
> [Logs From Consumer on Data Center B]
> [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
> [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
> (Re-)joining group 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
> [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Successfully joined group with generation Generation\{generationId=52, 
> memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Successfully synced group in generation Generation\{generationId=52, 
> memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined 
> group at generation 52 with protocol version 2 and got assignment: 
> Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', 
> leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], 
> taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, 
> MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], 
> delay=0} with rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting 
> connectors and tasks using config offset 1360 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1921)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Finished 
> starting connectors and tasks 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1950)
> [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] 
> Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
> [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] 
> (Re-)joining group 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
> [2024-07-23 17:29:26,890] INFO [Worker clientId=A->B, groupId=A-mm2] 
> Successfully joined group with generation Generation\{generationId=143, 
> memberId='A->B-0d04e6c1-f12a-4

[jira] [Commented] (KAFKA-17186) Cannot receive message after stopping Source Mirror Maker 2

2024-07-29 Thread George Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869474#comment-17869474
 ] 

George Yang commented on KAFKA-17186:
-

OK,[~gharris1727]  Thank you for your explanation and research.

> Cannot receive message after stopping Source Mirror Maker 2
> ---
>
> Key: KAFKA-17186
> URL: https://issues.apache.org/jira/browse/KAFKA-17186
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.1
> Environment: Source Kafka Cluster per Node:
> CPU(s): 32
> Memory: 32G/1.1G free
> Target Kafka Cluster standalone Node:
> CPU(s): 24
> Memory: 30G/909M free
> Kafka Version 3.7
> Mirrormaker Version 3.7.1
>Reporter: George Yang
>Priority: Major
> Attachments: image-2024-07-25-14-14-21-327.png, mirrorMaker.out
>
>
> Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node 
> 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. 
> Currently, a service on node 1 in Data Center A acts as a producer sending 
> messages to the `myTest` topic. A service in Data Center B acts as a consumer 
> listening to `A.myTest`. 
> The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer 
> in Data Center B ceases to receive messages. Even after I restarting MM2 in 
> Data Center A, the consumer in Data Center B still does not receive messages 
> until approximately 5 minutes later when a rebalance occurs, at which point 
> it begins receiving messages again.
>  
> [Logs From Consumer on Data Center B]
> [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
> [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
> (Re-)joining group 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
> [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Successfully joined group with generation Generation\{generationId=52, 
> memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Successfully synced group in generation Generation\{generationId=52, 
> memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined 
> group at generation 52 with protocol version 2 and got assignment: 
> Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', 
> leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], 
> taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, 
> MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], 
> delay=0} with rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting 
> connectors and tasks using config offset 1360 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1921)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Finished 
> starting connectors and tasks 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1950)
> [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] 
> Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
> [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] 
> (Re-)joining group 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
> [2024-07-23 17:29:26,890] INFO [Worker clientId=A->B, groupId=A-mm2] 
> Successfully joined group with generation Generation\{generatio

[jira] [Updated] (KAFKA-17215) Remove get-prefix for all getters

2024-07-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-17215:

Description: 
Kafka traditionally does not use a `get` prefix for getter methods. However, 
for multiple public interfaces, we don't follow this common pattern, but 
actually have a get-prefix.

We might want to clean this up. The upcoming 4.0 release might be a good 
opportunity to deprecate existing methods and add them back with the "correct" 
name.

We should maybe also do multiple smaller KIPs instead of just one big KIP. We 
do know of the following
 * StreamsConfig (getMainConsumerConfigs, getRestoreConsumerConfigs, 
getGlobalConsumerConfigs, getProducerConfigs, getAdminConfigs, getClientTags, 
getKafkaClientSupplier – for some of these, we might even consider to remove 
them; it's questionable if it makes sense to have them in the public API (cf 
[https://github.com/apache/kafka/pull/14548)] – we should also consider 
https://issues.apache.org/jira/browse/KAFKA-16945 for this work)
 * TopologyConfig (getTaskConfig)
 * KafkaClientSupplier (getAdmin, getProducer, getConsumer, getRestoreConsumer, 
getGlobalConsumer)
 * Contexts (maybe not worth it... we might deprecate the whole class soon):
 ** ProcessorContext (getStateStore)
 ** MockProcessorContext (getStateStore)
 ** MockProcessorContext#{color:#172b4d}CapturedPunctuator (getIntervalMs, 
getType, getPunctuator),{color}
 * api.ProcessingContext (getStateStore)
 ** api.FixedKeyProcessorContext (getStateStore)
 ** api.MockProcessorContext (getStateStore, getStateStoreContext)
 ** api.MockProcessorContext#{color:#172b4d}CapturedPunctuator (getInterval, 
getType, getPunctuator){color}
 * StateStore (getPosition)

 
 * IQv2: officially an evolving API (maybe we can rename in 4.0 directly w/o 
deprecation period, but might be nasty...)
 ** KeyQuery (getKey) 
 ** Position (getTopics, getPartitionPositions)
 ** QueryResult (getExecutionInfo, getPosition, getFailureReason, 
getFailureMessage, getResult)
 ** RangeQuery (getLowerBound, getUpperBound)
 ** StateQueryRequest (getStoreName, getPositionBound, getQuery, getPartitions)
 ** StateQueryResult (getPartitionResults, getOnlyPartitionResult, 
getGlobalResult, getPosition)
 ** WindowKeyQuery (getKey, getTimeFrom, getTimeTo, 
 ** WindowRangeQuery (getKey, getTimeFrom, getTimeTo)

 
 * TopologyTestDriver (getAllStateStores, getStateStore, getKeyValueStore, 
getTimestampedKeyValueStore, getVersionedKeyValueStore, getWindowStore, 
getTimestampedWindowStore, getSessionStore)
 * TestOutputTopic (getQueueSize)

  was:
Kafka traditionally does not use a `get` prefix for getter methods. However, 
for multiple public interfaces, we don't follow this common pattern, but 
actually have a get-prefix.

We might want to clean this up. The upcoming 4.0 release might be a good 
opportunity to deprecate existing methods and add them back with the "correct" 
name.

We should maybe also do multiple smaller KIPs instead of just one big KIP. We 
do know of the following
 * StreamsConfig (getMainConsumerConfigs, getRestoreConsumerConfigs, 
getGlobalConsumerConfigs, getProducerConfigs, getAdminConfigs, getClientTags, 
getKafkaClientSupplier – for some of these, we might even consider to remove 
them; it's questionable if it makes sense to have them in the public API (cf 
[https://github.com/apache/kafka/pull/14548)] – we should also consider 
https://issues.apache.org/jira/browse/KAFKA-16945 for this work)
 * TopologyConfig (getTaskConfig)
 * KafkaClientSupplier (getAdmin, getProducer, getConsumer, getRestoreConsumer, 
getGlobalConsumer)
 * Contexts (maybe not worth it... we might deprecate the whole class soon):
 ** ProcessorContext (getStateStore)
 ** MockProcessorContext (getStateStore)
 ** MockProcessorContext#{color:#172b4d}CapturedPunctuator (getIntervalMs, 
getType, getPunctuator),{color}
 * api.ProcessingContext (getStateStore)
 ** api.FixedKeyProcessorContext (getStateStore)
 ** api.MockProcessorContext (getStateStore)
 ** api.MockProcessorContext#{color:#172b4d}CapturedPunctuator (getInterval, 
getType, getPunctuator){color}
 * StateStore (getPosition)

 
 * IQv2: officially an evolving API (maybe we can rename in 4.0 directly w/o 
deprecation period, but might be nasty...)
 ** KeyQuery (getKey) 
 ** Position (getTopics, getPartitionPositions)
 ** QueryResult (getExecutionInfo, getPosition, getFailureReason, 
getFailureMessage, getResult)
 ** RangeQuery (getLowerBound, getUpperBound)
 ** StateQueryRequest (getStoreName, getPositionBound, getQuery, getPartitions)
 ** StateQueryResult (getPartitionResults, getOnlyPartitionResult, 
getGlobalResult, getPosition)
 ** WindowKeyQuery (getKey, getTimeFrom, getTimeTo, 
 ** WindowRangeQuery (getKey, getTimeFrom, getTimeTo)

 
 * TopologyTestDriver (getAllStateStores, getStateStore, getKeyValueStore, 
getTimestampedKeyValueStore, getVersionedKeyValueS

[jira] [Updated] (KAFKA-17215) Remove get-prefix for all getters

2024-07-29 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-17215:

Description: 
Kafka traditionally does not use a `get` prefix for getter methods. However, 
for multiple public interfaces, we don't follow this common pattern, but 
actually have a get-prefix.

We might want to clean this up. The upcoming 4.0 release might be a good 
opportunity to deprecate existing methods and add them back with the "correct" 
name.

We should maybe also do multiple smaller KIPs instead of just one big KIP. We 
do know of the following
 * StreamsConfig (getMainConsumerConfigs, getRestoreConsumerConfigs, 
getGlobalConsumerConfigs, getProducerConfigs, getAdminConfigs, getClientTags, 
getKafkaClientSupplier – for some of these, we might even consider to remove 
them; it's questionable if it makes sense to have them in the public API (cf 
[https://github.com/apache/kafka/pull/14548)] – we should also consider 
https://issues.apache.org/jira/browse/KAFKA-16945 for this work)
 * TopologyConfig (getTaskConfig)
 * KafkaClientSupplier (getAdmin, getProducer, getConsumer, getRestoreConsumer, 
getGlobalConsumer)
 * Contexts (maybe not worth it... we might deprecate the whole class soon):
 ** ProcessorContext (getStateStore)
 ** MockProcessorContext (getStateStore)
 ** MockProcessorContext#{color:#172b4d}CapturedPunctuator (getIntervalMs, 
getType, getPunctuator),{color}
 * api.ProcessingContext (getStateStore)
 ** api.FixedKeyProcessorContext (getStateStore)
 ** api.MockProcessorContext (getStateStore)
 ** api.MockProcessorContext#{color:#172b4d}CapturedPunctuator (getInterval, 
getType, getPunctuator){color}
 * StateStore (getPosition)

 
 * IQv2: officially an evolving API (maybe we can rename in 4.0 directly w/o 
deprecation period, but might be nasty...)
 ** KeyQuery (getKey) 
 ** Position (getTopics, getPartitionPositions)
 ** QueryResult (getExecutionInfo, getPosition, getFailureReason, 
getFailureMessage, getResult)
 ** RangeQuery (getLowerBound, getUpperBound)
 ** StateQueryRequest (getStoreName, getPositionBound, getQuery, getPartitions)
 ** StateQueryResult (getPartitionResults, getOnlyPartitionResult, 
getGlobalResult, getPosition)
 ** WindowKeyQuery (getKey, getTimeFrom, getTimeTo, 
 ** WindowRangeQuery (getKey, getTimeFrom, getTimeTo)

 
 * TopologyTestDriver (getAllStateStores, getStateStore, getKeyValueStore, 
getTimestampedKeyValueStore, getVersionedKeyValueStore, getWindowStore, 
getTimestampedWindowStore, getSessionStore)
 * TestOutputTopic (getQueueSize)

  was:
Kafka traditionally does not use a `get` prefix for getter methods. However, 
for multiple public interfaces, we don't follow this common pattern, but 
actually have a get-prefix.

We might want to clean this up. The upcoming 4.0 release might be a good 
opportunity to deprecate existing methods and add them back with the "correct" 
name.

We should maybe also do multiple smaller KIPs instead of just one big KIP. We 
do know of the following
 * StreamsConfig (getMainConsumerConfigs, getRestoreConsumerConfigs, 
getGlobalConsumerConfigs, getProducerConfigs, getAdminConfigs, getClientTags, 
getKafkaClientSupplier – for some of these, we might even consider to remove 
them; it's questionable if it makes sense to have them in the public API (cf 
[https://github.com/apache/kafka/pull/14548)] – we should also consider 
https://issues.apache.org/jira/browse/KAFKA-16945 for this work)
 * TopologyConfig (getTaskConfig)
 * KafkaClientSupplier (getAdmin, getProducer, getConsumer, getRestoreConsumer, 
getGlobalConsumer)
 * Contexts (maybe not worth it... we might deprecate the whole class soon):
 ** ProcessorContext (getStateStore)
 ** MockProcessorContext (getStateStore)
 * 
api.ProcessingContext (getStateStore)
 ** 
api.FixedKeyProcessorContext (getStateStore)
 ** 
api.MockProcessorContext (getStateStore)
 * StateStore (getPosition)

 
 * IQv2: officially an evolving API (maybe we can rename in 4.0 directly w/o 
deprecation period, but might be nasty...)
 ** 
KeyQuery (getKey) 
 ** Position (getTopics, getPartitionPositions)
 ** QueryResult (getExecutionInfo, getPosition, getFailureReason, 
getFailureMessage, getResult)
 ** RangeQuery (getLowerBound, getUpperBound)
 ** StateQueryRequest (getStoreName, getPositionBound, getQuery, getPartitions)
 ** StateQueryResult (getPartitionResults, getOnlyPartitionResult, 
getGlobalResult, getPosition)
 ** WindowKeyQuery (getKey, getTimeFrom, getTimeTo, 
 ** WindowRangeQuery (getKey, getTimeFrom, getTimeTo)

 
 * TopologyTestDriver (getAllStateStores, getStateStore, getKeyValueStore, 
getTimestampedKeyValueStore, getVersionedKeyValueStore, getWindowStore, 
getTimestampedWindowStore, getSessionStore)
 * TestOutputTopic (getQueueSize)


> Remove get-prefix for all getters
> -
>
> Key: KAFKA-17215
>

[jira] [Commented] (KAFKA-17186) Cannot receive message after stopping Source Mirror Maker 2

2024-07-29 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869468#comment-17869468
 ] 

Greg Harris commented on KAFKA-17186:
-

I don't think i've ever thought about it like that, but your system does make 
some sense. The reality is less perfect :)

I looked at this implementation: 
[https://github.com/apache/kafka/blob/2cf87bff9b6b5136a22539edf48b2d7cc668bdf9/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L180]
 which forwards all .* configurations to the worker configuration, 
which is the input to the DistributedConfig. It just happens to not include 
"distributed" in the prefix.

The producer/consumer/admin pattern is something that the Worker/WorkerConfig 
specifies: 
[https://github.com/apache/kafka/blob/0ec520a2af05ecd6956b5e645e257d72ad5b8fbd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L833]
 and MirrorMaker2 doesn't follow the same pattern.

> Cannot receive message after stopping Source Mirror Maker 2
> ---
>
> Key: KAFKA-17186
> URL: https://issues.apache.org/jira/browse/KAFKA-17186
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.1
> Environment: Source Kafka Cluster per Node:
> CPU(s): 32
> Memory: 32G/1.1G free
> Target Kafka Cluster standalone Node:
> CPU(s): 24
> Memory: 30G/909M free
> Kafka Version 3.7
> Mirrormaker Version 3.7.1
>Reporter: George Yang
>Priority: Major
> Attachments: image-2024-07-25-14-14-21-327.png, mirrorMaker.out
>
>
> Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node 
> 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. 
> Currently, a service on node 1 in Data Center A acts as a producer sending 
> messages to the `myTest` topic. A service in Data Center B acts as a consumer 
> listening to `A.myTest`. 
> The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer 
> in Data Center B ceases to receive messages. Even after I restarting MM2 in 
> Data Center A, the consumer in Data Center B still does not receive messages 
> until approximately 5 minutes later when a rebalance occurs, at which point 
> it begins receiving messages again.
>  
> [Logs From Consumer on Data Center B]
> [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
> [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
> (Re-)joining group 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
> [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Successfully joined group with generation Generation\{generationId=52, 
> memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Successfully synced group in generation Generation\{generationId=52, 
> memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined 
> group at generation 52 with protocol version 2 and got assignment: 
> Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', 
> leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], 
> taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, 
> MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], 
> delay=0} with rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting 
> connectors and tasks using config offset 1360 
> (org.apache.kafk

[jira] [Created] (KAFKA-17215) Remove get-prefix for all getters

2024-07-29 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-17215:
---

 Summary: Remove get-prefix for all getters
 Key: KAFKA-17215
 URL: https://issues.apache.org/jira/browse/KAFKA-17215
 Project: Kafka
  Issue Type: Improvement
  Components: streams, streams-test-utils
Reporter: Matthias J. Sax


Kafka traditionally does not use a `get` prefix for getter methods. However, 
for multiple public interfaces, we don't follow this common pattern, but 
actually have a get-prefix.

We might want to clean this up. The upcoming 4.0 release might be a good 
opportunity to deprecate existing methods and add them back with the "correct" 
name.

We should maybe also do multiple smaller KIPs instead of just one big KIP. We 
do know of the following
 * StreamsConfig (getMainConsumerConfigs, getRestoreConsumerConfigs, 
getGlobalConsumerConfigs, getProducerConfigs, getAdminConfigs, getClientTags, 
getKafkaClientSupplier – for some of these, we might even consider to remove 
them; it's questionable if it makes sense to have them in the public API (cf 
[https://github.com/apache/kafka/pull/14548)] – we should also consider 
https://issues.apache.org/jira/browse/KAFKA-16945 for this work)
 * TopologyConfig (getTaskConfig)
 * KafkaClientSupplier (getAdmin, getProducer, getConsumer, getRestoreConsumer, 
getGlobalConsumer)
 * Contexts (maybe not worth it... we might deprecate the whole class soon):
 ** ProcessorContext (getStateStore)
 ** MockProcessorContext (getStateStore)
 * 
api.ProcessingContext (getStateStore)
 ** 
api.FixedKeyProcessorContext (getStateStore)
 ** 
api.MockProcessorContext (getStateStore)
 * StateStore (getPosition)

 
 * IQv2: officially an evolving API (maybe we can rename in 4.0 directly w/o 
deprecation period, but might be nasty...)
 ** 
KeyQuery (getKey) 
 ** Position (getTopics, getPartitionPositions)
 ** QueryResult (getExecutionInfo, getPosition, getFailureReason, 
getFailureMessage, getResult)
 ** RangeQuery (getLowerBound, getUpperBound)
 ** StateQueryRequest (getStoreName, getPositionBound, getQuery, getPartitions)
 ** StateQueryResult (getPartitionResults, getOnlyPartitionResult, 
getGlobalResult, getPosition)
 ** WindowKeyQuery (getKey, getTimeFrom, getTimeTo, 
 ** WindowRangeQuery (getKey, getTimeFrom, getTimeTo)

 
 * TopologyTestDriver (getAllStateStores, getStateStore, getKeyValueStore, 
getTimestampedKeyValueStore, getVersionedKeyValueStore, getWindowStore, 
getTimestampedWindowStore, getSessionStore)
 * TestOutputTopic (getQueueSize)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17211) kafka producer get stuck by Uncaught error in kafka producer I/O thread

2024-07-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17211:
--
Component/s: producer 

> kafka producer get stuck by Uncaught error in kafka producer I/O thread
> ---
>
> Key: KAFKA-17211
> URL: https://issues.apache.org/jira/browse/KAFKA-17211
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.8.2
>Reporter: yapeng
>Priority: Major
>
> I have a kafka producer and it reported this error message forever.
> {code:java}
> [Producer clientId=xxx] Uncaught error in kafka producer I/O 
> thread: 
> java.util.NoSuchElementException: null
>   at java.util.ArrayDeque.getFirst(ArrayDeque.java:329)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.expiredBatches(RecordAccumulator.java:306)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:372)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:326)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:255)
>   at java.lang.Thread.run(Thread.java:750){code}
> It seems wired that there has null values in Deque. The 
> machine has been restarted to solve online issues so there is no heap dump.
> Please help give some insights about this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17116) New consumer may not send effective leave group if member ID received after close

2024-07-29 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869467#comment-17869467
 ] 

Kirk True commented on KAFKA-17116:
---

My understanding is:
 # This is an edge case
 # The broker contains logic to remove stale members

If the above two statements are true, what's the downside of letting the broker 
handle the member as it does today, namely removing it after some configured 
amount of time?

I agree that it makes sense to augment the client logic to only send the 'leave 
group' request if we have a member ID. At this point, I'm -1 on the idea of 
introducing temporary IDs to handle edge cases like this.

> New consumer may not send effective leave group if member ID received after 
> close 
> --
>
> Key: KAFKA-17116
> URL: https://issues.apache.org/jira/browse/KAFKA-17116
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Assignee: TengYao Chi
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.9.0
>
>
> If the new consumer is closed after sending a HB to join, but before 
> receiving the response to it, it will send a leave group request but without 
> member ID (will simply fail with UNKNOWN_MEMBER_ID). This will make that the 
> broker will have a registered new member, for which it will never receive a 
> leave request for it.
>  # consumer.subscribe -> sends HB to join, transitions to JOINING
>  # consumer.close -> will transition to LEAVING and send HB with epoch -1 
> (without waiting for in-flight requests)
>  # consumer receives response to initial HB, containing the assigned member 
> ID. It will simply ignore it because it's not in the group anymore 
> (UNSUBSCRIBED)
> Note that the expectation, with the current logic, and main downsides of this 
> are:
>  # If the case was that the member received partitions on the first HB, those 
> partitions won't be re-assigned (broker waiting for the closed consumer to 
> reconcile them), until the rebalance timeout expires. 
>  # Even if no partitions were assigned to it, the member will remain in the 
> group from the broker point of view (but not from the client POV). The member 
> will be eventually kicked out for not sending HBs, but only when it's session 
> timeout expires.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16390) consume_bench_test.py failed using AsyncKafkaConsumer

2024-07-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16390:
--
Component/s: clients

> consume_bench_test.py failed using AsyncKafkaConsumer
> -
>
> Key: KAFKA-16390
> URL: https://issues.apache.org/jira/browse/KAFKA-16390
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, system tests
>Reporter: Philip Nee
>Assignee: TengYao Chi
>Priority: Major
>  Labels: kip-848-client-support
>
> Ran the system test based on KAFKA-16273
> The following tests failed using the consumer group protocol
> {code:java}
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_single_partition.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
>  {code}
> Because of
> {code:java}
>  TimeoutError('consume_workload failed to finish in the expected amount of 
> time.')
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>     data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
>     return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", 
> line 146, in test_single_partition
>     consume_workload.wait_for_done(timeout_sec=180)
>   File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line 
> 352, in wait_for_done
>     wait_until(lambda: self.done(),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
>     raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: consume_workload failed to finish in the 
> expected amount of time. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16390) consume_bench_test.py failed using AsyncKafkaConsumer

2024-07-29 Thread Kirk True (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869464#comment-17869464
 ] 

Kirk True commented on KAFKA-16390:
---

[~frankvicky] transferring the Jira to you and marking as Patch Available. 
Thanks!

> consume_bench_test.py failed using AsyncKafkaConsumer
> -
>
> Key: KAFKA-16390
> URL: https://issues.apache.org/jira/browse/KAFKA-16390
> Project: Kafka
>  Issue Type: Task
>  Components: consumer, system tests
>Reporter: Philip Nee
>Assignee: TengYao Chi
>Priority: Major
>  Labels: kip-848-client-support
>
> Ran the system test based on KAFKA-16273
> The following tests failed using the consumer group protocol
> {code:java}
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_single_partition.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
>  {code}
> Because of
> {code:java}
>  TimeoutError('consume_workload failed to finish in the expected amount of 
> time.')
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>     data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
>     return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", 
> line 146, in test_single_partition
>     consume_workload.wait_for_done(timeout_sec=180)
>   File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line 
> 352, in wait_for_done
>     wait_until(lambda: self.done(),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
>     raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: consume_workload failed to finish in the 
> expected amount of time. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16390) consume_bench_test.py failed using AsyncKafkaConsumer

2024-07-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16390:
-

Assignee: TengYao Chi  (was: Kirk True)

> consume_bench_test.py failed using AsyncKafkaConsumer
> -
>
> Key: KAFKA-16390
> URL: https://issues.apache.org/jira/browse/KAFKA-16390
> Project: Kafka
>  Issue Type: Task
>  Components: consumer, system tests
>Reporter: Philip Nee
>Assignee: TengYao Chi
>Priority: Major
>  Labels: kip-848-client-support
>
> Ran the system test based on KAFKA-16273
> The following tests failed using the consumer group protocol
> {code:java}
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_single_partition.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
>  {code}
> Because of
> {code:java}
>  TimeoutError('consume_workload failed to finish in the expected amount of 
> time.')
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>     data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
>     return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", 
> line 146, in test_single_partition
>     consume_workload.wait_for_done(timeout_sec=180)
>   File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line 
> 352, in wait_for_done
>     wait_until(lambda: self.done(),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
>     raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: consume_workload failed to finish in the 
> expected amount of time. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17209) Revisit testCurrentLag for AsyncKafkaConsumer

2024-07-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17209:
--
Labels: kip-848-client-support  (was: )

> Revisit testCurrentLag for AsyncKafkaConsumer
> -
>
> Key: KAFKA-17209
> URL: https://issues.apache.org/jira/browse/KAFKA-17209
> Project: Kafka
>  Issue Type: Test
>  Components: clients, unit tests
>Reporter: TaiJuWu
>Assignee: TaiJuWu
>Priority: Major
>  Labels: kip-848-client-support
>
> The difference between {{ClassicConsumer}} and {{AsyncConsumer}} is 
> ClassicConsumer is single thread and AsyncConsumer is two thread, one is app 
> thread, the other is background thread.
> {{Fetch}} Request is generated by background thread and {{ListOffset}} 
> Request is generated by application thread.
> This will make {{testCurrentLag}} fail if AsyncConsumer generated {{Fetch 
> Request}} first.
> So we support out of order feature for {{MockClient}} to resolve issue.
> Another difference between asyncConsumer and ClassicConsumer is front do 
> generate additional {{FetchOffset}} Request from CommitRequestManager but 
> ClassicConsumer does not have the request.
> The reason is coordinator is not ready during test for {{ClassicConsumer}} so 
> we can ignore this request for {{AsyncConsumer.}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17209) Revisit testCurrentLag for AsyncKafkaConsumer

2024-07-29 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17209:
--
Component/s: clients
 unit tests

> Revisit testCurrentLag for AsyncKafkaConsumer
> -
>
> Key: KAFKA-17209
> URL: https://issues.apache.org/jira/browse/KAFKA-17209
> Project: Kafka
>  Issue Type: Test
>  Components: clients, unit tests
>Reporter: TaiJuWu
>Assignee: TaiJuWu
>Priority: Major
>
> The difference between {{ClassicConsumer}} and {{AsyncConsumer}} is 
> ClassicConsumer is single thread and AsyncConsumer is two thread, one is app 
> thread, the other is background thread.
> {{Fetch}} Request is generated by background thread and {{ListOffset}} 
> Request is generated by application thread.
> This will make {{testCurrentLag}} fail if AsyncConsumer generated {{Fetch 
> Request}} first.
> So we support out of order feature for {{MockClient}} to resolve issue.
> Another difference between asyncConsumer and ClassicConsumer is front do 
> generate additional {{FetchOffset}} Request from CommitRequestManager but 
> ClassicConsumer does not have the request.
> The reason is coordinator is not ready during test for {{ClassicConsumer}} so 
> we can ignore this request for {{AsyncConsumer.}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16883) Zookeeper-Kraft failing migration - RPC got timed out before it could be sent

2024-07-29 Thread David Arthur (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869447#comment-17869447
 ] 

David Arthur commented on KAFKA-16883:
--

Thanks for confirming your success hre [~nicolas.henneaux].

 

Looking at the log between 3.7.0 and 3.7.1, there were a handful of fixes 
related to metadata/controller and one fix specific to migrations (KAFKA-16563)

 

> Zookeeper-Kraft failing migration - RPC got timed out before it could be sent
> -
>
> Key: KAFKA-16883
> URL: https://issues.apache.org/jira/browse/KAFKA-16883
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.7.0, 3.6.1, 3.6.2
>Reporter: Nicolas Henneaux
>Priority: Major
> Fix For: 3.7.1
>
>
> Despite several attempts to migrate from Zookeeper cluster to Kraft, it 
> failed to properly migrate.
> We spawn a need cluster fully healthy with 3 Kafka nodes connected to 3 
> Zookeeper nodes. 3 new Kafka nodes are there for the new controllers.
> It was tested with Kafka 3.6.1, 3.6.2 and 3.7.0.
> it might be linked to KAFKA-15330.
> The controllers are started without issue. When the brokers are then 
> configured for the migration, the migration is not starting. Once the last 
> broker is restarted, we got the following logs.
> {code:java}
> [2024-06-03 15:11:48,192] INFO [ReplicaFetcherThread-0-11]: Stopped 
> (kafka.server.ReplicaFetcherThread)
> [2024-06-03 15:11:48,193] INFO [ReplicaFetcherThread-0-11]: Shutdown 
> completed (kafka.server.ReplicaFetcherThread)
> {code}
> Then we only get the following every 30s
> {code:java}
> [2024-06-03 15:12:04,163] INFO [BrokerLifecycleManager id=12 isZkBroker=true] 
> Unable to register the broker because the RPC got timed out before it could 
> be sent. (kafka.server.BrokerLifecycleManager)
> [2024-06-03 15:12:34,297] INFO [BrokerLifecycleManager id=12 isZkBroker=true] 
> Unable to register the broker because the RPC got timed out before it could 
> be sent. (kafka.server.BrokerLifecycleManager)
> [2024-06-03 15:13:04,536] INFO [BrokerLifecycleManager id=12 isZkBroker=true] 
> Unable to register the broker because the RPC got timed out before it could 
> be sent. (kafka.server.BrokerLifecycleManager){code}
> The config on the controller node is the following
> {code:java}
> kafka0202e1 ~]$  sudo grep -v '^\s*$\|^\s*\#' /etc/kafka/server.properties  | 
> grep -v password | sort
> advertised.host.name=kafka0202e1.ahub.sb.eu.ginfra.net
> broker.rack=e1
> controller.listener.names=CONTROLLER
> controller.quorum.voters=2...@kafka0202e1.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e2.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e3.ahub.sb.eu.ginfra.net:9093
> default.replication.factor=3
> delete.topic.enable=false
> group.initial.rebalance.delay.ms=3000
> inter.broker.protocol.version=3.7
> listeners=CONTROLLER://kafka0202e1.ahub.sb.eu.ginfra.net:9093
> listener.security.protocol.map=CONTROLLER:SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> log.dirs=/data/kafka
> log.message.format.version=3.6
> log.retention.check.interval.ms=30
> log.retention.hours=240
> log.segment.bytes=1073741824
> min.insync.replicas=2
> node.id=20
> num.io.threads=8
> num.network.threads=3
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> offsets.topic.replication.factor=3
> process.roles=controller
> security.inter.broker.protocol=SSL
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> socket.send.buffer.bytes=102400
> ssl.cipher.suites=TLS_AES_256_GCM_SHA384
> ssl.client.auth=required
> ssl.enabled.protocols=TLSv1.3
> ssl.endpoint.identification.algorithm=HTTPS
> ssl.keystore.location=/etc/kafka/ssl/keystore.ts
> ssl.keystore.type=JKS
> ssl.secure.random.implementation=SHA1PRNG
> ssl.truststore.location=/etc/kafka/ssl/truststore.ts
> transaction.state.log.min.isr=3
> transaction.state.log.replication.factor=3
> unclean.leader.election.enable=false
> zookeeper.connect=10.135.65.199:2181,10.133.65.199:2181,10.137.64.56:2181,
> zookeeper.metadata.migration.enable=true
>  {code}
> The config on the broker node is the following
> {code}
> $ sudo grep -v '^\s*$\|^\s*\#' /etc/kafka/server.properties  | grep -v 
> password | sort
> advertised.host.name=kafka0201e3.ahub.sb.eu.ginfra.net
> advertised.listeners=SSL://kafka0201e3.ahub.sb.eu.ginfra.net:9092
> broker.id=12
> broker.rack=e3
> controller.listener.names=CONTROLLER # added once all controllers were started
> controller.quor

[jira] [Closed] (KAFKA-16883) Zookeeper-Kraft failing migration - RPC got timed out before it could be sent

2024-07-29 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur closed KAFKA-16883.


> Zookeeper-Kraft failing migration - RPC got timed out before it could be sent
> -
>
> Key: KAFKA-16883
> URL: https://issues.apache.org/jira/browse/KAFKA-16883
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.7.0, 3.6.1, 3.6.2
>Reporter: Nicolas Henneaux
>Priority: Major
> Fix For: 3.7.1
>
>
> Despite several attempts to migrate from Zookeeper cluster to Kraft, it 
> failed to properly migrate.
> We spawn a need cluster fully healthy with 3 Kafka nodes connected to 3 
> Zookeeper nodes. 3 new Kafka nodes are there for the new controllers.
> It was tested with Kafka 3.6.1, 3.6.2 and 3.7.0.
> it might be linked to KAFKA-15330.
> The controllers are started without issue. When the brokers are then 
> configured for the migration, the migration is not starting. Once the last 
> broker is restarted, we got the following logs.
> {code:java}
> [2024-06-03 15:11:48,192] INFO [ReplicaFetcherThread-0-11]: Stopped 
> (kafka.server.ReplicaFetcherThread)
> [2024-06-03 15:11:48,193] INFO [ReplicaFetcherThread-0-11]: Shutdown 
> completed (kafka.server.ReplicaFetcherThread)
> {code}
> Then we only get the following every 30s
> {code:java}
> [2024-06-03 15:12:04,163] INFO [BrokerLifecycleManager id=12 isZkBroker=true] 
> Unable to register the broker because the RPC got timed out before it could 
> be sent. (kafka.server.BrokerLifecycleManager)
> [2024-06-03 15:12:34,297] INFO [BrokerLifecycleManager id=12 isZkBroker=true] 
> Unable to register the broker because the RPC got timed out before it could 
> be sent. (kafka.server.BrokerLifecycleManager)
> [2024-06-03 15:13:04,536] INFO [BrokerLifecycleManager id=12 isZkBroker=true] 
> Unable to register the broker because the RPC got timed out before it could 
> be sent. (kafka.server.BrokerLifecycleManager){code}
> The config on the controller node is the following
> {code:java}
> kafka0202e1 ~]$  sudo grep -v '^\s*$\|^\s*\#' /etc/kafka/server.properties  | 
> grep -v password | sort
> advertised.host.name=kafka0202e1.ahub.sb.eu.ginfra.net
> broker.rack=e1
> controller.listener.names=CONTROLLER
> controller.quorum.voters=2...@kafka0202e1.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e2.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e3.ahub.sb.eu.ginfra.net:9093
> default.replication.factor=3
> delete.topic.enable=false
> group.initial.rebalance.delay.ms=3000
> inter.broker.protocol.version=3.7
> listeners=CONTROLLER://kafka0202e1.ahub.sb.eu.ginfra.net:9093
> listener.security.protocol.map=CONTROLLER:SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> log.dirs=/data/kafka
> log.message.format.version=3.6
> log.retention.check.interval.ms=30
> log.retention.hours=240
> log.segment.bytes=1073741824
> min.insync.replicas=2
> node.id=20
> num.io.threads=8
> num.network.threads=3
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> offsets.topic.replication.factor=3
> process.roles=controller
> security.inter.broker.protocol=SSL
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> socket.send.buffer.bytes=102400
> ssl.cipher.suites=TLS_AES_256_GCM_SHA384
> ssl.client.auth=required
> ssl.enabled.protocols=TLSv1.3
> ssl.endpoint.identification.algorithm=HTTPS
> ssl.keystore.location=/etc/kafka/ssl/keystore.ts
> ssl.keystore.type=JKS
> ssl.secure.random.implementation=SHA1PRNG
> ssl.truststore.location=/etc/kafka/ssl/truststore.ts
> transaction.state.log.min.isr=3
> transaction.state.log.replication.factor=3
> unclean.leader.election.enable=false
> zookeeper.connect=10.135.65.199:2181,10.133.65.199:2181,10.137.64.56:2181,
> zookeeper.metadata.migration.enable=true
>  {code}
> The config on the broker node is the following
> {code}
> $ sudo grep -v '^\s*$\|^\s*\#' /etc/kafka/server.properties  | grep -v 
> password | sort
> advertised.host.name=kafka0201e3.ahub.sb.eu.ginfra.net
> advertised.listeners=SSL://kafka0201e3.ahub.sb.eu.ginfra.net:9092
> broker.id=12
> broker.rack=e3
> controller.listener.names=CONTROLLER # added once all controllers were started
> controller.quorum.voters=2...@kafka0202e1.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e2.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e3.ahub.sb.eu.ginfra.net:9093
>  # added once all controllers were started
> default.replication.factor=3
> delete.topic.enable=false
> group.initial.rebalan

[jira] [Commented] (KAFKA-17146) ZK to KRAFT migration stuck in pre-migration mode

2024-07-29 Thread David Arthur (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869445#comment-17869445
 ] 

David Arthur commented on KAFKA-17146:
--

[~saimon46] thanks for the report.

> More than that, the new controller claims to be the CONTROLLER but it refuses 
> to be it.

This is by design, but the active KRaft controller should not stay in this 
state indefinitely. The KRaft controller will claim the ZK active controller 
role by writing its node ID into the /controller ZNode. It will then perform 
the migration of the metadata. There is a period of time where the active KRaft 
controller will be returning NOT_CONTROLLER to RPCs (while the migration is 
happening).

 

Can you provide the full log for the active KRaft controller? If it's too 
large, just messages from KRaftMigrationDriver should shed some light on the 
situation. 

> ZK to KRAFT migration stuck in pre-migration mode
> -
>
> Key: KAFKA-17146
> URL: https://issues.apache.org/jira/browse/KAFKA-17146
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft, migration
>Affects Versions: 3.7.0, 3.7.1
> Environment: Virtual machines isolated: 3 VMs with Kafka brokers + 3 
> Zookeeper/KRAFT
>Reporter: Simone Brundu
>Priority: Blocker
>  Labels: kraft, migration, zookeeper
>
> I'm facing a migration from Zookeeper to KRAFT with Kafka 3.7.1 cluster. 
> (EDIT: the same issue happens for version 3.7.0)
> I'm using this configuration to allow SSL everywhere and, SCRAM 
> authentication only for brokers and PLAIN authentication for controllers 
> {code:java}
> listener.security.protocol.map=EXTERNAL_SASL:SASL_SSL,CONTROLLER:SASL_SSL
> inter.broker.listener.name=EXTERNAL_SASL
> sasl.enabled.mechanisms=SCRAM-SHA-512,PLAIN
> sasl.mechanism=SCRAM-SHA-512
> sasl.mechanism.controller.protocol=PLAIN
> sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 {code}
> The cluster has 3 brokers and 3 zookeeper nodes initially then a controllers 
> cluster with 3 KRAFT controllers is configured and running in parallel as per 
> documentation for the migration process.
> I’ve started the migration with 3 controllers enrolled with SASL_SSL with 
> PLAIN authentication and I already have a strange TRACE log:
> {code:java}
> TRACE [KRaftMigrationDriver id=3000] Received metadata delta, but the 
> controller is not in dual-write mode. Ignoring the change to be replicated to 
> Zookeeper (org.apache.kafka.metadata.migration.KRaftMigrationDriver) {code}
> With later this message where KRAFT is waiting to brokers to connect
> {code:java}
> INFO [KRaftMigrationDriver id=1000] No brokers are known to KRaft, waiting 
> for brokers to register. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver) {code}
> As soon I start to reconfigure the brokers letting them to connect to the new 
> controllers, all good in the KRAFT controllers with notifications that the 
> KRAFT brokers were connecting correctly connected and enrolled
> {code:java}
> INFO [QuorumController id=1000] Replayed initial RegisterBrokerRecord for 
> broker 1: RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=true, 
> incarnationId=xx, brokerEpoch=2638, 
> endPoints=[BrokerEndpoint(name='EXTERNAL_SASL', host='vmk-tdtkafka-01', 
> port=9095, securityProtocol=3)], 
> features=[BrokerFeature(name='metadata.version', minSupportedVersion=19, 
> maxSupportedVersion=19)], rack='zur1', fenced=true, 
> inControlledShutdown=false, logDirs=[xx]) 
> (org.apache.kafka.controller.ClusterControlManager)
> [...]
> INFO [KRaftMigrationDriver id=1000] Still waiting for ZK brokers [2, 3] to 
> register with KRaft. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
> [...]
> INFO [KRaftMigrationDriver id=1000] Still waiting for ZK brokers [2] to 
> register with KRaft. 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver) {code}
> As soon the first broker is connected we start to get these info logs related 
> to the migration process in the controller:
> {code:java}
> INFO [QuorumController id=1000] Cannot run write operation maybeFenceReplicas 
> in pre-migration mode. Returning NOT_CONTROLLER. 
> (org.apache.kafka.controller.QuorumController)
> INFO [QuorumController id=1000] maybeFenceReplicas: event failed with 
> NotControllerException in 355 microseconds. Exception message: The controller 
> is in pre-migration mode. (org.apache.kafka.controller.QuorumController){code}
> but as well requests to autocreate topics that exist already, in loop every 
> 30seconds, in the last broker restart

[jira] [Resolved] (KAFKA-15522) Flaky test org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOneWayReplicationWithFrequentOffsetSyncs

2024-07-29 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-15522.
---
Resolution: Fixed

> Flaky test 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOneWayReplicationWithFrequentOffsetSyncs
> --
>
> Key: KAFKA-15522
> URL: https://issues.apache.org/jira/browse/KAFKA-15522
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0, 3.5.1
>Reporter: Josep Prat
>Priority: Major
>  Labels: flaky, flaky-test
>
> h3. Last seen: 
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14458/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_17_and_Scala_2_13___testOneWayReplicationWithFrequentOffsetSyncs__/
> h3. Error Message
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.{code}
> h3. Stacktrace
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:1276)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:235)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:149)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.startClusters(MirrorConnectorsIntegrationExactlyOnceTest.java:51)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>  at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:128)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeEachMethod(TimeoutExtension.java:78)
>  at 
> org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
>  at 
> org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>  at 
> org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
>  at 
> org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
>  at 
> org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeMethodInExtensionContext(ClassBasedTestDescriptor.java:521)
>  at 
> org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$synthesizeBeforeEachMethodAdapter$23(ClassBasedTestDescriptor.java:506)
>  at 
> org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeEachMethods$3(TestMethodTestDescriptor.java:175)
>  at 
> org.junit.jupiter.engine.descriptor.TestMethod

[jira] [Commented] (KAFKA-15522) Flaky test org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOneWayReplicationWithFrequentOffsetSyncs

2024-07-29 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869429#comment-17869429
 ] 

Chris Egerton commented on KAFKA-15522:
---

We haven't seen this failure in a while and the stack trace in the description 
indicates it was probably a client/broker issue (or just extremely overloaded 
testing hardware). I think this is safe to close.

> Flaky test 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOneWayReplicationWithFrequentOffsetSyncs
> --
>
> Key: KAFKA-15522
> URL: https://issues.apache.org/jira/browse/KAFKA-15522
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.6.0, 3.5.1
>Reporter: Josep Prat
>Priority: Major
>  Labels: flaky, flaky-test
>
> h3. Last seen: 
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14458/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_17_and_Scala_2_13___testOneWayReplicationWithFrequentOffsetSyncs__/
> h3. Error Message
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.{code}
> h3. Stacktrace
> {code:java}
> java.lang.RuntimeException: java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: The request timed out. at 
> org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:1276)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:235)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:149)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.startClusters(MirrorConnectorsIntegrationExactlyOnceTest.java:51)
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base/java.lang.reflect.Method.invoke(Method.java:568) at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>  at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:128)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeEachMethod(TimeoutExtension.java:78)
>  at 
> org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
>  at 
> org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>  at 
> org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
>  at 
> org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
>  at 
> org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeMethodInExtensionContext(ClassBasedTestDescriptor.java:521)
>  at 
> org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$synthesizeBeforeEachMethodAdapter$23(ClassBasedTestDescriptor.java:506)
>  at 
> org.junit.jupiter.engine.descriptor.TestMetho

[jira] [Created] (KAFKA-17214) Add 3.8.0 Streams and Core to system tests

2024-07-29 Thread Josep Prat (Jira)
Josep Prat created KAFKA-17214:
--

 Summary: Add 3.8.0 Streams and Core to system tests
 Key: KAFKA-17214
 URL: https://issues.apache.org/jira/browse/KAFKA-17214
 Project: Kafka
  Issue Type: Bug
Reporter: Josep Prat


As per Release Instructions we should add 3.8.0 version to system tests. 
Example PRs:
 * Broker and clients: [https://github.com/apache/kafka/pull/12210]
 * Streams: [https://github.com/apache/kafka/pull/12209]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17213) Change ControllerMovedException from ApiException to InvalidMetadataException.

2024-07-29 Thread Josep Prat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Prat updated KAFKA-17213:
---
Fix Version/s: (was: 3.8.0)

> Change ControllerMovedException from ApiException to InvalidMetadataException.
> --
>
> Key: KAFKA-17213
> URL: https://issues.apache.org/jira/browse/KAFKA-17213
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: xiaochen.zhou
>Priority: Minor
>
> After the Kafka client fails to send, it will update the metadata. The 
> {{InvalidMetadataException}} is retryable. I think the Controller information 
> also belongs to Kafka's metadata, so {{ControllerMovedException}} should be 
> {{{}InvalidMetadataException{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-17202) EosIntegrationTest.verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset leaks consumers

2024-07-29 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17202.

Fix Version/s: 3.9.0
   Resolution: Fixed

> EosIntegrationTest.verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset 
> leaks consumers
> --
>
> Key: KAFKA-17202
> URL: https://issues.apache.org/jira/browse/KAFKA-17202
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Affects Versions: 3.9.0
>Reporter: Greg Harris
>Assignee: TengYao Chi
>Priority: Minor
>  Labels: newbie
> Fix For: 3.9.0
>
>
> This method creates a KafkaConsumer, but does not close it.
> We can use a try-with-resources to ensure the consumer is closed prior to 
> returning or throwing from this function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-4928) Add integration test for DumpLogSegments

2024-07-29 Thread Volk Huang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869406#comment-17869406
 ] 

Volk Huang commented on KAFKA-4928:
---

[~chia7712] No problem, thank you for the tips!

> Add integration test for DumpLogSegments
> 
>
> Key: KAFKA-4928
> URL: https://issues.apache.org/jira/browse/KAFKA-4928
> Project: Kafka
>  Issue Type: Test
>  Components: log, tools
>Reporter: Ismael Juma
>Assignee: Volk Huang
>Priority: Major
>  Labels: newbie
>
> DumpLogSegments is an important tool to analyse log files, but we have no 
> JUnit tests for it. It would be good to have some tests that verify that the 
> output is sane for a populated log.
> Our system tests call DumpLogSegments, but we should be able to detect 
> regressions via the JUnit test suite.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-4928) Add integration test for DumpLogSegments

2024-07-29 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869400#comment-17869400
 ] 

Chia-Ping Tsai commented on KAFKA-4928:
---

[~soravolk] I have assigned this jira to you. BTW, Please add IT to 
`DumpLogSegmentsTest`. Also, please use new test infra `ClusterTestExtensions`. 
thanks!

> Add integration test for DumpLogSegments
> 
>
> Key: KAFKA-4928
> URL: https://issues.apache.org/jira/browse/KAFKA-4928
> Project: Kafka
>  Issue Type: Test
>  Components: log, tools
>Reporter: Ismael Juma
>Assignee: Volk Huang
>Priority: Major
>  Labels: newbie
>
> DumpLogSegments is an important tool to analyse log files, but we have no 
> JUnit tests for it. It would be good to have some tests that verify that the 
> output is sane for a populated log.
> Our system tests call DumpLogSegments, but we should be able to detect 
> regressions via the JUnit test suite.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-4928) Add integration test for DumpLogSegments

2024-07-29 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-4928:
-

Assignee: Volk Huang

> Add integration test for DumpLogSegments
> 
>
> Key: KAFKA-4928
> URL: https://issues.apache.org/jira/browse/KAFKA-4928
> Project: Kafka
>  Issue Type: Test
>  Components: log, tools
>Reporter: Ismael Juma
>Assignee: Volk Huang
>Priority: Major
>  Labels: newbie
>
> DumpLogSegments is an important tool to analyse log files, but we have no 
> JUnit tests for it. It would be good to have some tests that verify that the 
> output is sane for a populated log.
> Our system tests call DumpLogSegments, but we should be able to detect 
> regressions via the JUnit test suite.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17201) SelectorTest.testInboundConnectionsCountInConnectionCreationMetric leaks sockets and threads

2024-07-29 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-17201:
--

Assignee: TengYao Chi

> SelectorTest.testInboundConnectionsCountInConnectionCreationMetric leaks 
> sockets and threads
> 
>
> Key: KAFKA-17201
> URL: https://issues.apache.org/jira/browse/KAFKA-17201
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.9.0
>Reporter: Greg Harris
>Assignee: TengYao Chi
>Priority: Minor
>  Labels: flaky-test, newbie
>
> This test creates multiple Threads which in turn open sockets. It is possible 
> that these threads and sockets outlive the test itself, as the threads are 
> not joined before the test completes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-4928) Add integration test for DumpLogSegments

2024-07-29 Thread Volk Huang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869399#comment-17869399
 ] 

Volk Huang commented on KAFKA-4928:
---

Hi [~ijuma], can I try this one? Thank you so much.

> Add integration test for DumpLogSegments
> 
>
> Key: KAFKA-4928
> URL: https://issues.apache.org/jira/browse/KAFKA-4928
> Project: Kafka
>  Issue Type: Test
>  Components: log, tools
>Reporter: Ismael Juma
>Priority: Major
>  Labels: newbie
>
> DumpLogSegments is an important tool to analyse log files, but we have no 
> JUnit tests for it. It would be good to have some tests that verify that the 
> output is sane for a populated log.
> Our system tests call DumpLogSegments, but we should be able to detect 
> regressions via the JUnit test suite.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17116) New consumer may not send effective leave group if member ID received after close

2024-07-29 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869396#comment-17869396
 ] 

Chia-Ping Tsai commented on KAFKA-17116:


[~dajac] thanks for sharing the idea and details.

The basic idea is "client needs to know/have the uuid before member is added to 
group". IMHO both solutions have pros and cons. However, the option_2: using 
the first HB to get UUID from server needs to deal with the compatibility, as 
the old server still add the member to the group when handling the first HB. 
that is to say, the client needs to distinguish whether this member id is added 
to group or not. otherwise, it may produces some weird log messages. By 
contrast, the option_1 is non-invasive since our server can handle HB 
with/without client-defined member id now. Also, the non-java client does not 
need to change any code if they don't care for this edge case. If they does 
care that edge case, python, golang, .net, and rust have built-in uuid support, 
and there are many UUID impl for c++ (uuid_v4, boost's uuid, and uuid_v4). 
Hence, I assume UUID support is not a big issue.



> New consumer may not send effective leave group if member ID received after 
> close 
> --
>
> Key: KAFKA-17116
> URL: https://issues.apache.org/jira/browse/KAFKA-17116
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Assignee: TengYao Chi
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.9.0
>
>
> If the new consumer is closed after sending a HB to join, but before 
> receiving the response to it, it will send a leave group request but without 
> member ID (will simply fail with UNKNOWN_MEMBER_ID). This will make that the 
> broker will have a registered new member, for which it will never receive a 
> leave request for it.
>  # consumer.subscribe -> sends HB to join, transitions to JOINING
>  # consumer.close -> will transition to LEAVING and send HB with epoch -1 
> (without waiting for in-flight requests)
>  # consumer receives response to initial HB, containing the assigned member 
> ID. It will simply ignore it because it's not in the group anymore 
> (UNSUBSCRIBED)
> Note that the expectation, with the current logic, and main downsides of this 
> are:
>  # If the case was that the member received partitions on the first HB, those 
> partitions won't be re-assigned (broker waiting for the closed consumer to 
> reconcile them), until the rebalance timeout expires. 
>  # Even if no partitions were assigned to it, the member will remain in the 
> group from the broker point of view (but not from the client POV). The member 
> will be eventually kicked out for not sending HBs, but only when it's session 
> timeout expires.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17137) Ensure Admin APIs are properly tested

2024-07-29 Thread Kuan Po Tseng (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kuan Po Tseng updated KAFKA-17137:
--
Description: 
A number of Admin client APIs don't have integration tests. While testing 3.8.0 
RC0 we discovered the Admin.describeTopics() API hung. This should have been 
caught by tests.

I suggest to create subtasks for each API that needs tests.

*Part 0*

[https://github.com/apache/kafka/pull/16676]

createTopics + retryOnQuotaViolation
deleteTopics + timeoutMs
deleteTopics + retryOnQuotaViolation
listTopics + timeoutMs
listTopics + listInternal
describeTopics + timeoutMs
describeTopics + partitionSizeLimitPerResponse

*Part 1*
[https://github.com/apache/kafka/pull/16648]
describeAcls + timeoutMs
createAcls + timeoutMs
deleteAcls + timeoutMs
describeConfigs + timeoutMs
alterConfigs + timeoutMs
createPartitions + retryOnQuotaViolation
expireDelegationToken + expiryTimePeriodMs

*Part 2*
[https://github.com/apache/kafka/pull/16717]
listConsumerGroups + withTypes
listConsumerGroupOffsets(groupId) + requireStable
listConsumerGroupOffsets(groupSpecs) + requireStable
listConsumerGroupOffsets(groupSpecs)
removeMembersFromConsumerGroup + reason
alterConsumerGroupOffsets
alterClientQuotas + validateOnly

*Part 3*
[https://github.com/apache/kafka/pull/16652]
describeUserScramCredentials
describeProducers + brokerId
describeTransactions + timeoutMs
abortTransaction + timeoutMs
listTransactions + filterStates
listTransactions + filterProducerIds
listTransactions + filterOnDuration

*Part 4*
[https://github.com/apache/kafka/pull/16658]
fenceProducers + timeoutMs
listClientMetricsResources
listClientMetricsResources + timeoutMs
clientInstanceId
addRaftVoter
removeRaftVoter
metrics

  was:
A number of Admin client APIs don't have integration tests. While testing 3.8.0 
RC0 we discovered the Admin.describeTopics() API hung. This should have been 
caught by tests.

I suggest to create subtasks for each API that needs tests.

*Part 0*

[https://github.com/apache/kafka/pull/16676]

createTopics + retryOnQuotaViolation
deleteTopics + timeoutMs
deleteTopics + retryOnQuotaViolation
listTopics + timeoutMs
listTopics + listInternal
describeTopics + timeoutMs
describeTopics + partitionSizeLimitPerResponse

*Part 1*
[https://github.com/apache/kafka/pull/16648]
describeAcls + timeoutMs
createAcls + timeoutMs
deleteAcls + timeoutMs
describeConfigs + timeoutMs
alterConfigs + timeoutMs
createPartitions + retryOnQuotaViolation
expireDelegationToken + expiryTimePeriodMs

*Part 2*
listConsumerGroups + withTypes
listConsumerGroupOffsets(groupId) + requireStable
listConsumerGroupOffsets(groupSpecs) + requireStable
listConsumerGroupOffsets(groupSpecs)
removeMembersFromConsumerGroup + reason
alterConsumerGroupOffsets
alterClientQuotas + validateOnly

*Part 3*
[https://github.com/apache/kafka/pull/16652]
describeUserScramCredentials
describeProducers + brokerId
describeTransactions + timeoutMs
abortTransaction + timeoutMs
listTransactions + filterStates
listTransactions + filterProducerIds
listTransactions + filterOnDuration

*Part 4*
[https://github.com/apache/kafka/pull/16658]
fenceProducers + timeoutMs
listClientMetricsResources
listClientMetricsResources + timeoutMs
clientInstanceId
addRaftVoter
removeRaftVoter
metrics


> Ensure Admin APIs are properly tested
> -
>
> Key: KAFKA-17137
> URL: https://issues.apache.org/jira/browse/KAFKA-17137
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Mickael Maison
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> A number of Admin client APIs don't have integration tests. While testing 
> 3.8.0 RC0 we discovered the Admin.describeTopics() API hung. This should have 
> been caught by tests.
> I suggest to create subtasks for each API that needs tests.
> *Part 0*
> [https://github.com/apache/kafka/pull/16676]
> createTopics + retryOnQuotaViolation
> deleteTopics + timeoutMs
> deleteTopics + retryOnQuotaViolation
> listTopics + timeoutMs
> listTopics + listInternal
> describeTopics + timeoutMs
> describeTopics + partitionSizeLimitPerResponse
> *Part 1*
> [https://github.com/apache/kafka/pull/16648]
> describeAcls + timeoutMs
> createAcls + timeoutMs
> deleteAcls + timeoutMs
> describeConfigs + timeoutMs
> alterConfigs + timeoutMs
> createPartitions + retryOnQuotaViolation
> expireDelegationToken + expiryTimePeriodMs
> *Part 2*
> [https://github.com/apache/kafka/pull/16717]
> listConsumerGroups + withTypes
> listConsumerGroupOffsets(groupId) + requireStable
> listConsumerGroupOffsets(groupSpecs) + requireStable
> listConsumerGroupOffsets(groupSpecs)
> removeMembersFromConsumerGroup + reason
> alterConsumerGroupOffse

[jira] [Updated] (KAFKA-17200) Enable MM2 to replicate topics ending in "internal" suffix

2024-07-29 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-17200:
--
Labels: needs-kip  (was: )

> Enable MM2 to replicate topics ending in "internal" suffix
> --
>
> Key: KAFKA-17200
> URL: https://issues.apache.org/jira/browse/KAFKA-17200
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Patrik Márton
>Assignee: Patrik Márton
>Priority: Minor
>  Labels: needs-kip
>
> In the current Mirror Maker 2 implementation, topics ending in ".internal" or 
> "-internal" cannot be replicated as they are considered connect/mm2 internal 
> topics. 
> In some cases, users have business topics ending in ".internal" or 
> "-internal" that are excluded from the replication for the same reason. This 
> is because of two things:
> (1) The ReplicationPolicy interface excludes all topics ending in ".internal" 
> or "-internal" from the replication, as they are considered internal connect 
> topics:
> {code:java}
> /** Internal topics are never replicated. */
> default boolean isInternalTopic(String topic) {
> boolean isKafkaInternalTopic = topic.startsWith("__") || 
> topic.startsWith(".");
> boolean isDefaultConnectTopic =  topic.endsWith("-internal") ||  
> topic.endsWith(".internal");
> return isMM2InternalTopic(topic) || isKafkaInternalTopic || 
> isDefaultConnectTopic;
> } {code}
> (2) The DefaultTopicFilter has the following default exclude regular 
> expression:
> {code:java}
> ".*[\\-\\.]internal, .*\\.replica, __.*" {code}
> The goal of this ticket is to enable the replication of such business topics, 
> while making sure that kafka internal topics are not replicated.
> *Solution 1:* The DefaultTopicFilter can be configured to have a different 
> exclude list with more specific regex to exclude all kafka internal topics, 
> but include business internal topics, eg.:
> {code:java}
> "mm2.*[\\-\\.]internal, .*\\.replica, __.*"; {code}
> As these topics are explicitly excluded in the ReplicationPolicy interface 
> too, we can have a new replication policy from DefaultReplicationPolicy, that 
> preserves the same behavior, but overrides the isInternalTopic() method in a 
> way that topics ending with the internal suffix are not considered internal 
> topics. Obviously in this case it would be important to set up the topic 
> filter correctly to avoid replicating kafka internal topics.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17207) ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking clients

2024-07-29 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-17207:
-

Assignee: Xuze Yang  (was: Chris Egerton)

> ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking 
> clients
> -
>
> Key: KAFKA-17207
> URL: https://issues.apache.org/jira/browse/KAFKA-17207
> Project: Kafka
>  Issue Type: Test
>  Components: connect
>Affects Versions: 3.9.0
>Reporter: Greg Harris
>Assignee: Xuze Yang
>Priority: Minor
>  Labels: newbie
>
> The testRequestTimeouts deletes the internal config topic, putting the 
> connect workers into a bad state. When the test goes to clean up, it calls 
> DistributedHerder#stop, which waits for the herder executor to stop. This 
> times out, because the herder executor is blocked closing the 
> KafkaConfigBackingStore's producer. This log message gets printed:
> {noformat}
> [2024-07-26 11:52:50,817] ERROR Executor 
> java.util.concurrent.ThreadPoolExecutor@7ae97a58[Shutting down, pool size = 
> 1, active threads = 1, queued tasks = 0, completed tasks = 0] did not 
> terminate in time (org.apache.kafka.common.utils.ThreadUtils:83){noformat}
> This effectively leaks the Kafka clients for the workers' internal topics, 
> and the herder executor thread. Instead, either the producer should not block 
> indefinitely on a missing topic, or the cluster state should be healed enough 
> for the producer to shutdown cleanly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17207) ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking clients

2024-07-29 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869375#comment-17869375
 ] 

Chris Egerton commented on KAFKA-17207:
---

Whoops, Jira hotkeys conspired against me! Did not mean to assign this to 
myself.

 

Hi [~Xuze Yang] it's all yours, enjoy!

> ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking 
> clients
> -
>
> Key: KAFKA-17207
> URL: https://issues.apache.org/jira/browse/KAFKA-17207
> Project: Kafka
>  Issue Type: Test
>  Components: connect
>Affects Versions: 3.9.0
>Reporter: Greg Harris
>Assignee: Chris Egerton
>Priority: Minor
>  Labels: newbie
>
> The testRequestTimeouts deletes the internal config topic, putting the 
> connect workers into a bad state. When the test goes to clean up, it calls 
> DistributedHerder#stop, which waits for the herder executor to stop. This 
> times out, because the herder executor is blocked closing the 
> KafkaConfigBackingStore's producer. This log message gets printed:
> {noformat}
> [2024-07-26 11:52:50,817] ERROR Executor 
> java.util.concurrent.ThreadPoolExecutor@7ae97a58[Shutting down, pool size = 
> 1, active threads = 1, queued tasks = 0, completed tasks = 0] did not 
> terminate in time (org.apache.kafka.common.utils.ThreadUtils:83){noformat}
> This effectively leaks the Kafka clients for the workers' internal topics, 
> and the herder executor thread. Instead, either the producer should not block 
> indefinitely on a missing topic, or the cluster state should be healed enough 
> for the producer to shutdown cleanly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17207) ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking clients

2024-07-29 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-17207:
-

Assignee: Chris Egerton

> ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking 
> clients
> -
>
> Key: KAFKA-17207
> URL: https://issues.apache.org/jira/browse/KAFKA-17207
> Project: Kafka
>  Issue Type: Test
>  Components: connect
>Affects Versions: 3.9.0
>Reporter: Greg Harris
>Assignee: Chris Egerton
>Priority: Minor
>  Labels: newbie
>
> The testRequestTimeouts deletes the internal config topic, putting the 
> connect workers into a bad state. When the test goes to clean up, it calls 
> DistributedHerder#stop, which waits for the herder executor to stop. This 
> times out, because the herder executor is blocked closing the 
> KafkaConfigBackingStore's producer. This log message gets printed:
> {noformat}
> [2024-07-26 11:52:50,817] ERROR Executor 
> java.util.concurrent.ThreadPoolExecutor@7ae97a58[Shutting down, pool size = 
> 1, active threads = 1, queued tasks = 0, completed tasks = 0] did not 
> terminate in time (org.apache.kafka.common.utils.ThreadUtils:83){noformat}
> This effectively leaks the Kafka clients for the workers' internal topics, 
> and the herder executor thread. Instead, either the producer should not block 
> indefinitely on a missing topic, or the cluster state should be healed enough 
> for the producer to shutdown cleanly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17044) Connector deletion can lead to resource leak during a long running connector startup

2024-07-29 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869372#comment-17869372
 ] 

Chris Egerton commented on KAFKA-17044:
---

[~bgoyal] pinging again, please let me know if this can be closed.

> Connector deletion can lead to resource leak during a long running connector 
> startup
> 
>
> Key: KAFKA-17044
> URL: https://issues.apache.org/jira/browse/KAFKA-17044
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Bhagyashree
>Priority: Major
>
> We have identified a gap in the shutdown flow for the connector worker. If 
> the connector is in 
> [INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404]
>  state and still executing the 
> [WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218]
>  method, a DELETE API call would invoke the 
> [WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298]
>  and [notify() 
> |https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but
>  the connector worker would not shutdown immediately. This happens because 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  is a blocking call and the control reaches 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  in 
> [doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151]
>  after the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  call has completed. This results in a gap in the delete flow where the 
> connector is not immediately shutdown leaving the resources running. 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  keeps running and only when the execution of 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  completes, we reach at the point of 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  and then 
> [doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183]
>  of the connector worker is invoked.
> This seems similar to what has been identified for connector tasks as part of 
> https://issues.apache.org/jira/browse/KAFKA-14725.
> *Steps to repro*
> 1. Start a connector with time taking operation in 
> [connector.start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  call
> 2. Call DELETE API to delete this connector
> 3. The connector would be deleted only after the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  completes.
> The issue was observed when a connector was configured to retry a db 
> connection for sometime. 
> {*}Current Behaviour{*}: The connector did not shutdown until the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  method completed.
> {*}Expected Behaviou{*}r: The connector should abort what it is doing and 
> shutdown as requested by the Delete call.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17116) New consumer may not send effective leave group if member ID received after close

2024-07-29 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869371#comment-17869371
 ] 

David Jacot commented on KAFKA-17116:
-

Hi all, I was thinking about two options to resolve this issue:
 # We could actually generate the member id on the client side. This is already 
supported by the protocol. Back then, we rejected this option because it was 
considered too difficult for non java clients such as librdkafka as some 
languages do not have built-in uuid support. This may be better now. We can 
check this. We also need to convince ourselves that letting the client generate 
it is good enough.
 # We could use the first HB send out by the client to generate the uuid 
without adding the member to the group yet. So, the first HB would send back 
the new member id, zero as the member epoch as, zero as the heartbeat interval 
so the client HB immediately. This is basically a way to do 1. but without 
relying on the client to generate it. Note that this does not require any 
changes to the protocol too. An alternative to this would be to return a new 
error code to make it explicit but I am not sure if it is worth it.

I am not a fan of the idea of passing a temporary ID on the client that we 
would store on the server. If we believe that we could generate an id on the 
client, we should just do 1.

I think that we could also improve the client state machine to better handle 
those cases.

> New consumer may not send effective leave group if member ID received after 
> close 
> --
>
> Key: KAFKA-17116
> URL: https://issues.apache.org/jira/browse/KAFKA-17116
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Lianet Magrans
>Assignee: TengYao Chi
>Priority: Major
>  Labels: kip-848-client-support
> Fix For: 3.9.0
>
>
> If the new consumer is closed after sending a HB to join, but before 
> receiving the response to it, it will send a leave group request but without 
> member ID (will simply fail with UNKNOWN_MEMBER_ID). This will make that the 
> broker will have a registered new member, for which it will never receive a 
> leave request for it.
>  # consumer.subscribe -> sends HB to join, transitions to JOINING
>  # consumer.close -> will transition to LEAVING and send HB with epoch -1 
> (without waiting for in-flight requests)
>  # consumer receives response to initial HB, containing the assigned member 
> ID. It will simply ignore it because it's not in the group anymore 
> (UNSUBSCRIBED)
> Note that the expectation, with the current logic, and main downsides of this 
> are:
>  # If the case was that the member received partitions on the first HB, those 
> partitions won't be re-assigned (broker waiting for the closed consumer to 
> reconcile them), until the rebalance timeout expires. 
>  # Even if no partitions were assigned to it, the member will remain in the 
> group from the broker point of view (but not from the client POV). The member 
> will be eventually kicked out for not sending HBs, but only when it's session 
> timeout expires.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16263) Add Kafka Streams docs about available listeners/callback

2024-07-29 Thread zhengke zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhengke zhou reassigned KAFKA-16263:


Assignee: zhengke zhou  (was: Kuan Po Tseng)

> Add Kafka Streams docs about available listeners/callback
> -
>
> Key: KAFKA-16263
> URL: https://issues.apache.org/jira/browse/KAFKA-16263
> Project: Kafka
>  Issue Type: Task
>  Components: docs, streams
>Reporter: Matthias J. Sax
>Assignee: zhengke zhou
>Priority: Minor
>  Labels: beginner, newbie
>
> Kafka Streams allows to register all kind of listeners and callback (eg, 
> uncaught-exception-handler, restore-listeners, etc) but those are not in the 
> documentation.
> A good place might be 
> [https://kafka.apache.org/documentation/streams/developer-guide/running-app.html]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17213) Change ControllerMovedException from ApiException to InvalidMetadataException.

2024-07-29 Thread xiaochen.zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaochen.zhou updated KAFKA-17213:
--
Fix Version/s: 3.8.0
   (was: 3.7.0)

> Change ControllerMovedException from ApiException to InvalidMetadataException.
> --
>
> Key: KAFKA-17213
> URL: https://issues.apache.org/jira/browse/KAFKA-17213
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: xiaochen.zhou
>Priority: Minor
> Fix For: 3.8.0
>
>
> After the Kafka client fails to send, it will update the metadata. The 
> {{InvalidMetadataException}} is retryable. I think the Controller information 
> also belongs to Kafka's metadata, so {{ControllerMovedException}} should be 
> {{{}InvalidMetadataException{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17213) Change ControllerMovedException from ApiException to InvalidMetadataException.

2024-07-29 Thread xiaochen.zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaochen.zhou updated KAFKA-17213:
--
  Component/s: clients
Fix Version/s: 3.7.0
  Description: After the Kafka client fails to send, it will update the 
metadata. The {{InvalidMetadataException}} is retryable. I think the Controller 
information also belongs to Kafka's metadata, so {{ControllerMovedException}} 
should be {{{}InvalidMetadataException{}}}.
   Issue Type: Improvement  (was: New Feature)
 Priority: Minor  (was: Major)
  Summary: Change ControllerMovedException from ApiException to 
InvalidMetadataException.  (was: Make  )

> Change ControllerMovedException from ApiException to InvalidMetadataException.
> --
>
> Key: KAFKA-17213
> URL: https://issues.apache.org/jira/browse/KAFKA-17213
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: xiaochen.zhou
>Priority: Minor
> Fix For: 3.7.0
>
>
> After the Kafka client fails to send, it will update the metadata. The 
> {{InvalidMetadataException}} is retryable. I think the Controller information 
> also belongs to Kafka's metadata, so {{ControllerMovedException}} should be 
> {{{}InvalidMetadataException{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17213) Make

2024-07-29 Thread xiaochen.zhou (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xiaochen.zhou updated KAFKA-17213:
--
Summary: Make(was: Make )

> Make  
> --
>
> Key: KAFKA-17213
> URL: https://issues.apache.org/jira/browse/KAFKA-17213
> Project: Kafka
>  Issue Type: New Feature
>Reporter: xiaochen.zhou
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17213) Make

2024-07-29 Thread xiaochen.zhou (Jira)
xiaochen.zhou created KAFKA-17213:
-

 Summary: Make 
 Key: KAFKA-17213
 URL: https://issues.apache.org/jira/browse/KAFKA-17213
 Project: Kafka
  Issue Type: New Feature
Reporter: xiaochen.zhou






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-17177) reviewers.py should grep "authors" to offer more candidates of reviewers information

2024-07-29 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17177.

Fix Version/s: 3.9.0
   Resolution: Fixed

> reviewers.py should grep "authors" to offer more candidates of reviewers 
> information
> 
>
> Key: KAFKA-17177
> URL: https://issues.apache.org/jira/browse/KAFKA-17177
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia Chuan Yu
>Priority: Minor
> Fix For: 3.9.0
>
>
> It seems to me `author` should be a qualified reviewer too. Also, `author` 
> normally have "email" which is hard to be found from github page sometimes ...



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17211) kafka producer get stuck by Uncaught error in kafka producer I/O thread

2024-07-29 Thread yapeng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869311#comment-17869311
 ] 

yapeng commented on KAFKA-17211:


I noticed there is producer sender thread poll elements from 
Deque and producer client may invoke RecordAccumulator.append. 
Both actions can make changes to head, tail to ArrayDeque. Although there is 
synchronized but it still has race conditions because head, tail are not 
defined as volatile in ArrayDeque.
{code:java}
public class ArrayDeque extends AbstractCollection
   implements Deque, Cloneable, Serializable
{
/**
 * The array in which the elements of the deque are stored.
 * The capacity of the deque is the length of this array, which is
 * always a power of two. The array is never allowed to become
 * full, except transiently within an addX method where it is
 * resized (see doubleCapacity) immediately upon becoming full,
 * thus avoiding head and tail wrapping around to equal each
 * other.  We also guarantee that all array cells not holding
 * deque elements are always null.
 */
transient Object[] elements; // non-private to simplify nested class access

/**
 * The index of the element at the head of the deque (which is the
 * element that would be removed by remove() or pop()); or an
 * arbitrary number equal to tail if the deque is empty.
 */
transient int head;

/**
 * The index at which the next element would be added to the tail
 * of the deque (via addLast(E), add(E), or push(E)).
 */
transient int tail;
.
.
 }{code}
 
Is this possibly cause this bug?

> kafka producer get stuck by Uncaught error in kafka producer I/O thread
> ---
>
> Key: KAFKA-17211
> URL: https://issues.apache.org/jira/browse/KAFKA-17211
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.8.2
>Reporter: yapeng
>Priority: Major
>
> I have a kafka producer and it reported this error message forever.
> {code:java}
> [Producer clientId=xxx] Uncaught error in kafka producer I/O 
> thread: 
> java.util.NoSuchElementException: null
>   at java.util.ArrayDeque.getFirst(ArrayDeque.java:329)
>   at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.expiredBatches(RecordAccumulator.java:306)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:372)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:326)
>   at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:255)
>   at java.lang.Thread.run(Thread.java:750){code}
> It seems wired that there has null values in Deque. The 
> machine has been restarted to solve online issues so there is no heap dump.
> Please help give some insights about this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17212) Segments containing a single message can be incorrectly marked as local only

2024-07-29 Thread Guillaume Mallet (Jira)
Guillaume Mallet created KAFKA-17212:


 Summary: Segments containing a single message can be incorrectly 
marked as local only
 Key: KAFKA-17212
 URL: https://issues.apache.org/jira/browse/KAFKA-17212
 Project: Kafka
  Issue Type: Bug
  Components: Tiered-Storage
Affects Versions: 3.7.1, 3.8.0, 3.9.0
Reporter: Guillaume Mallet


There is an edge case triggered when a segment containing a single message 
causes the segment to be considered as local only which skews the deletion 
process towards deleting more data.

 

*This is very unlikely to happen in a real scenario but can happen in tests 
when segment are rolled manually.* 
*It could possibly happen when segment are rolled based on time but even then 
the skew would be minimal.*
h2. What happens

In order to delete the right amount of data against the byte retention policy, 
we first count all the bytes in 
[buildRetentionSizeData|https://github.com/apache/kafka/blob/09be14bb09dc336f941a7859232094bfb3cb3b96/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1335]
 function that are breaching {{{}retention.bytes{}}}. In order to do this, the 
size of each segment is added to the size of the segments present only on the 
disk {{{}onlyLocalLogSegmentsSize{}}}.
Listing the segment only present on disk is made through the function 
[onlyLocalLogSegmentSize|https://github.com/apache/kafka/blob/a0f6e6f816c6ac3fbbc4e0dc503dc43bfacfe6c7/core/src/main/scala/kafka/log/UnifiedLog.scala#L1618-L1619]
 by adding the size of each segments that have a _baseOffset_ greater or equal 
compared to {{{}highestOffsetInRemoteStorage{}}}{_}.{_}

{{highestOffsetInRemoteStorage}} is the highest offset that has been 
successfully sent to the remote store{_}.{_}
The _baseOffset_ of a segment is “a [lower bound ({*}inclusive{*}) of the 
offset in the 
segment”|https://github.com/apache/kafka/blob/a0f6e6f816c6ac3fbbc4e0dc503dc43bfacfe6c7/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java#L115].
 
In the case of a segment with a single message, the baseOffset can be equal to 
_highestOffsetInRemoteStorage,_ which means that despite the offset being 
offloaded to the RemoteStorage, we would count that segment as local only.

This has consequence when counting the bytes to delete as we will count the 
size of this segment twice in the 
[buildRetentionSizeData|https://github.com/apache/kafka/blob/09be14bb09dc336f941a7859232094bfb3cb3b96/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1155],
 once as a segment offloaded in the RemoteStorage and once as a local segment 
when 
[onlyLocalSegmentSize|https://github.com/apache/kafka/blob/a0f6e6f816c6ac3fbbc4e0dc503dc43bfacfe6c7/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1361-L1363]
 is added. 

The result is that {{remainingBreachedSize}} will be higher than expected which 
can lead to more byte deleted than what we would initially expect, up to the 
size of the segment which is double counted.

The issue is due to the fact we are using a greater or equal rather than equal. 
A segment present only locally will have a {{baseOffset}} strictly greater than 
{{highestOffsetInRemoteStorage.}}
h2. Reproducing the issue

The problem is highlighted in the 2 tests added in this [commit 
|https://github.com/apache/kafka/commit/97af351db517d69a2b37c92861e463a6d0c5cb8f]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16390) consume_bench_test.py failed using AsyncKafkaConsumer

2024-07-29 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869299#comment-17869299
 ] 

Chia-Ping Tsai commented on KAFKA-16390:


[~frankvicky] thanks for taking over this. Please feel free to file PR for it

> consume_bench_test.py failed using AsyncKafkaConsumer
> -
>
> Key: KAFKA-16390
> URL: https://issues.apache.org/jira/browse/KAFKA-16390
> Project: Kafka
>  Issue Type: Task
>  Components: consumer, system tests
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
>
> Ran the system test based on KAFKA-16273
> The following tests failed using the consumer group protocol
> {code:java}
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_single_partition.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
>  {code}
> Because of
> {code:java}
>  TimeoutError('consume_workload failed to finish in the expected amount of 
> time.')
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>     data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
>     return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", 
> line 146, in test_single_partition
>     consume_workload.wait_for_done(timeout_sec=180)
>   File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line 
> 352, in wait_for_done
>     wait_until(lambda: self.done(),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
>     raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: consume_workload failed to finish in the 
> expected amount of time. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16390) consume_bench_test.py failed using AsyncKafkaConsumer

2024-07-29 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869293#comment-17869293
 ] 

TengYao Chi commented on KAFKA-16390:
-

Hello [~kirktrue] 

I have some ideas regarding this issue. If you haven’t started working on it 
yet, I’d like to give it a try. :D

> consume_bench_test.py failed using AsyncKafkaConsumer
> -
>
> Key: KAFKA-16390
> URL: https://issues.apache.org/jira/browse/KAFKA-16390
> Project: Kafka
>  Issue Type: Task
>  Components: consumer, system tests
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>  Labels: kip-848-client-support
>
> Ran the system test based on KAFKA-16273
> The following tests failed using the consumer group protocol
> {code:java}
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_single_partition.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
>  {code}
> Because of
> {code:java}
>  TimeoutError('consume_workload failed to finish in the expected amount of 
> time.')
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 186, in _do_run
>     data = self.run_test()
>   File 
> "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", 
> line 246, in run_test
>     return self.test_context.function(self.test)
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
> 433, in wrapper
>     return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", 
> line 146, in test_single_partition
>     consume_workload.wait_for_done(timeout_sec=180)
>   File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line 
> 352, in wait_for_done
>     wait_until(lambda: self.done(),
>   File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line 
> 58, in wait_until
>     raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
> last_exception
> ducktape.errors.TimeoutError: consume_workload failed to finish in the 
> expected amount of time. {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17211) kafka producer get stuck by Uncaught error in kafka producer I/O thread

2024-07-29 Thread yapeng (Jira)
yapeng created KAFKA-17211:
--

 Summary: kafka producer get stuck by Uncaught error in kafka 
producer I/O thread
 Key: KAFKA-17211
 URL: https://issues.apache.org/jira/browse/KAFKA-17211
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.8.2
Reporter: yapeng


I have a kafka producer and it reported this error message forever.
{code:java}
[Producer clientId=xxx] Uncaught error in kafka producer I/O 
thread: 
java.util.NoSuchElementException: null
at java.util.ArrayDeque.getFirst(ArrayDeque.java:329)
at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.expiredBatches(RecordAccumulator.java:306)
at 
org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:372)
at 
org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:326)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:255)
at java.lang.Thread.run(Thread.java:750){code}
It seems wired that there has null values in Deque. The machine 
has been restarted to solve online issues so there is no heap dump.

Please help give some insights about this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15330) Migration from ZK to KRaft works with 3.4 but fails from 3.5 upwards

2024-07-29 Thread Simone Brundu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869256#comment-17869256
 ] 

Simone Brundu commented on KAFKA-15330:
---

I completely agree in having a clean cluster for the migration without any 
AUTH/SCRAM/SSL. The problem is that remove all these extra configs will require 
a short downtime before and after the migration. I can give a try with a 
cluster without any auth, but find a way to have basic PLAINTEXT protocol 
unauthenticated.

> Migration from ZK to KRaft works with 3.4 but fails from 3.5 upwards
> 
>
> Key: KAFKA-15330
> URL: https://issues.apache.org/jira/browse/KAFKA-15330
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0, 3.5.1
> Environment: Debian Bookworm/12.1
> kafka 3.4 and 3.5 / scala 2.13
> OpenJDK Runtime Environment (build 17.0.8+7-Debian-1deb12u1)
>Reporter: Roland Sommer
>Priority: Major
> Attachments: broker.properties, controller.properties
>
>
> We recently did some migration testing from our old ZK-based kafka clusters 
> to KRaft while still being on kafka 3.4. The migration tests succeeded at 
> first try. In the meantime we updated to kafka 3.5/3.5.1 and now we wanted to 
> continue our migration work, which ran into unexpected problems.
> On the controller we get messages like:
> {code:java}
> Aug 10 06:49:33 kafkactl01 kafka-server-start.sh[48572]: [2023-08-10 
> 06:49:33,072] INFO [KRaftMigrationDriver id=495] Still waiting for all 
> controller nodes ready to begin the migration. due to: Missing apiVersion 
> from nodes: [514, 760] 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver){code}
> On the broker side, we see:
> {code:java}
> 06:52:56,109] INFO [BrokerLifecycleManager id=6 isZkBroker=true] Unable to 
> register the broker because the RPC got timed out before it could be sent. 
> (kafka.server.BrokerLifecycleManager){code}
> If we reinstall the same development cluster with kafka 3.4, using the exact 
> same steps provided by your migration documentation (only difference is using 
> {{inter.broker.protocol.version=3.4}} instead of 
> {{{}inter.broker.protocol.version=3.5{}}}), everything works as expected. 
> Updating to kafka 3.5/3.5.1 yields the same problems.
> Testing is done on a three-node kafka cluster with a three-node zookeeper 
> ensemble and a three-node controller setup.
> Besides our default configuration containing the active zookeeper hosts etc., 
> this is what was added on the brokers:
> {code:java}
> # Migration
> advertised.listeners=PLAINTEXT://kafka03:9092
> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
> zookeeper.metadata.migration.enable=true
> controller.quorum.voters=495@kafkactl01:9093,760@kafkactl02:9093,514@kafkactl03:9093
> controller.listener.names=CONTROLLER
> {code}
> The main controller config looks like this:
> {code:java}
> process.roles=controller
> node.id=495
> controller.quorum.voters=495@kafkactl01:9093,760@kafkactl02:9093,514@kafkactl03:9093
> listeners=CONTROLLER://:9093
> inter.broker.listener.name=PLAINTEXT
> controller.listener.names=CONTROLLER
> listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
> zookeeper.metadata.migration.enable=true
> {code}
> Both configs contain the identical {{zookeeper.connect}} settings, everything 
> is setup automatically so it should be identical on every run and we can 
> reliably reproduce migration success on kafka 3.4 and migration failure using 
> the same setup with kafka 3.5.
> There are other issues mentioning problems with ApiVersions like KAFKA-15230 
> - not quite sure if this is a duplicate of the underlying problem there.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15330) Migration from ZK to KRaft works with 3.4 but fails from 3.5 upwards

2024-07-29 Thread Roland Sommer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869253#comment-17869253
 ] 

Roland Sommer commented on KAFKA-15330:
---

[~saimon46] our config is basically what I attached earlier in this ticket. As 
the affected clusters run in an isolated environment, we refrained from 
additional measures like auth/SCRAM/SSL.

> Migration from ZK to KRaft works with 3.4 but fails from 3.5 upwards
> 
>
> Key: KAFKA-15330
> URL: https://issues.apache.org/jira/browse/KAFKA-15330
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0, 3.5.1
> Environment: Debian Bookworm/12.1
> kafka 3.4 and 3.5 / scala 2.13
> OpenJDK Runtime Environment (build 17.0.8+7-Debian-1deb12u1)
>Reporter: Roland Sommer
>Priority: Major
> Attachments: broker.properties, controller.properties
>
>
> We recently did some migration testing from our old ZK-based kafka clusters 
> to KRaft while still being on kafka 3.4. The migration tests succeeded at 
> first try. In the meantime we updated to kafka 3.5/3.5.1 and now we wanted to 
> continue our migration work, which ran into unexpected problems.
> On the controller we get messages like:
> {code:java}
> Aug 10 06:49:33 kafkactl01 kafka-server-start.sh[48572]: [2023-08-10 
> 06:49:33,072] INFO [KRaftMigrationDriver id=495] Still waiting for all 
> controller nodes ready to begin the migration. due to: Missing apiVersion 
> from nodes: [514, 760] 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver){code}
> On the broker side, we see:
> {code:java}
> 06:52:56,109] INFO [BrokerLifecycleManager id=6 isZkBroker=true] Unable to 
> register the broker because the RPC got timed out before it could be sent. 
> (kafka.server.BrokerLifecycleManager){code}
> If we reinstall the same development cluster with kafka 3.4, using the exact 
> same steps provided by your migration documentation (only difference is using 
> {{inter.broker.protocol.version=3.4}} instead of 
> {{{}inter.broker.protocol.version=3.5{}}}), everything works as expected. 
> Updating to kafka 3.5/3.5.1 yields the same problems.
> Testing is done on a three-node kafka cluster with a three-node zookeeper 
> ensemble and a three-node controller setup.
> Besides our default configuration containing the active zookeeper hosts etc., 
> this is what was added on the brokers:
> {code:java}
> # Migration
> advertised.listeners=PLAINTEXT://kafka03:9092
> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
> zookeeper.metadata.migration.enable=true
> controller.quorum.voters=495@kafkactl01:9093,760@kafkactl02:9093,514@kafkactl03:9093
> controller.listener.names=CONTROLLER
> {code}
> The main controller config looks like this:
> {code:java}
> process.roles=controller
> node.id=495
> controller.quorum.voters=495@kafkactl01:9093,760@kafkactl02:9093,514@kafkactl03:9093
> listeners=CONTROLLER://:9093
> inter.broker.listener.name=PLAINTEXT
> controller.listener.names=CONTROLLER
> listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
> zookeeper.metadata.migration.enable=true
> {code}
> Both configs contain the identical {{zookeeper.connect}} settings, everything 
> is setup automatically so it should be identical on every run and we can 
> reliably reproduce migration success on kafka 3.4 and migration failure using 
> the same setup with kafka 3.5.
> There are other issues mentioning problems with ApiVersions like KAFKA-15230 
> - not quite sure if this is a duplicate of the underlying problem there.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15330) Migration from ZK to KRaft works with 3.4 but fails from 3.5 upwards

2024-07-29 Thread Simone Brundu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869252#comment-17869252
 ] 

Simone Brundu commented on KAFKA-15330:
---

[~rsommer] , how did you manage to make the migration successful? For us, with 
3.7.1 we have this issue KAFKA-17146 . Did you have any authentication, SCRAM, 
SSL enabled in your cluster? Maybe something fails silently during the 
migration and we don't notice it. Can you post here the configuration in the 
brokers and in the KRAFT controllers?

> Migration from ZK to KRaft works with 3.4 but fails from 3.5 upwards
> 
>
> Key: KAFKA-15330
> URL: https://issues.apache.org/jira/browse/KAFKA-15330
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0, 3.5.1
> Environment: Debian Bookworm/12.1
> kafka 3.4 and 3.5 / scala 2.13
> OpenJDK Runtime Environment (build 17.0.8+7-Debian-1deb12u1)
>Reporter: Roland Sommer
>Priority: Major
> Attachments: broker.properties, controller.properties
>
>
> We recently did some migration testing from our old ZK-based kafka clusters 
> to KRaft while still being on kafka 3.4. The migration tests succeeded at 
> first try. In the meantime we updated to kafka 3.5/3.5.1 and now we wanted to 
> continue our migration work, which ran into unexpected problems.
> On the controller we get messages like:
> {code:java}
> Aug 10 06:49:33 kafkactl01 kafka-server-start.sh[48572]: [2023-08-10 
> 06:49:33,072] INFO [KRaftMigrationDriver id=495] Still waiting for all 
> controller nodes ready to begin the migration. due to: Missing apiVersion 
> from nodes: [514, 760] 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver){code}
> On the broker side, we see:
> {code:java}
> 06:52:56,109] INFO [BrokerLifecycleManager id=6 isZkBroker=true] Unable to 
> register the broker because the RPC got timed out before it could be sent. 
> (kafka.server.BrokerLifecycleManager){code}
> If we reinstall the same development cluster with kafka 3.4, using the exact 
> same steps provided by your migration documentation (only difference is using 
> {{inter.broker.protocol.version=3.4}} instead of 
> {{{}inter.broker.protocol.version=3.5{}}}), everything works as expected. 
> Updating to kafka 3.5/3.5.1 yields the same problems.
> Testing is done on a three-node kafka cluster with a three-node zookeeper 
> ensemble and a three-node controller setup.
> Besides our default configuration containing the active zookeeper hosts etc., 
> this is what was added on the brokers:
> {code:java}
> # Migration
> advertised.listeners=PLAINTEXT://kafka03:9092
> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
> zookeeper.metadata.migration.enable=true
> controller.quorum.voters=495@kafkactl01:9093,760@kafkactl02:9093,514@kafkactl03:9093
> controller.listener.names=CONTROLLER
> {code}
> The main controller config looks like this:
> {code:java}
> process.roles=controller
> node.id=495
> controller.quorum.voters=495@kafkactl01:9093,760@kafkactl02:9093,514@kafkactl03:9093
> listeners=CONTROLLER://:9093
> inter.broker.listener.name=PLAINTEXT
> controller.listener.names=CONTROLLER
> listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
> zookeeper.metadata.migration.enable=true
> {code}
> Both configs contain the identical {{zookeeper.connect}} settings, everything 
> is setup automatically so it should be identical on every run and we can 
> reliably reproduce migration success on kafka 3.4 and migration failure using 
> the same setup with kafka 3.5.
> There are other issues mentioning problems with ApiVersions like KAFKA-15230 
> - not quite sure if this is a duplicate of the underlying problem there.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17210) Broker fixes for smooth concurrent fetches on share partition

2024-07-29 Thread Abhinav Dixit (Jira)
Abhinav Dixit created KAFKA-17210:
-

 Summary: Broker fixes for smooth concurrent fetches on share 
partition
 Key: KAFKA-17210
 URL: https://issues.apache.org/jira/browse/KAFKA-17210
 Project: Kafka
  Issue Type: Sub-task
Reporter: Abhinav Dixit
Assignee: Abhinav Dixit


Identified a couple of reliability issues with broker code for share groups - 
 # Broker seems to get stuck at times when using multiple share consumers due 
to a corner case where the second last fetch request did not contain any topic 
partition to fetch, because of which the broker could never complete the last 
request. This results in a share fetch request getting stuck.
 # Since persister would not perform any business logic around sending state 
batches for a share partition, there could be scenarios where it sends state 
batches with no AVAILABLE records. This could cause a breach on the limit of 
in-flight messages we have configured, and hence broker would never be able to 
complete the share fetch requests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15330) Migration from ZK to KRaft works with 3.4 but fails from 3.5 upwards

2024-07-29 Thread Roland Sommer (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869242#comment-17869242
 ] 

Roland Sommer commented on KAFKA-15330:
---

Finally, with kafka 3.7.1 all migrations were successfully finished on our 
kafka clusters.

> Migration from ZK to KRaft works with 3.4 but fails from 3.5 upwards
> 
>
> Key: KAFKA-15330
> URL: https://issues.apache.org/jira/browse/KAFKA-15330
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0, 3.5.1
> Environment: Debian Bookworm/12.1
> kafka 3.4 and 3.5 / scala 2.13
> OpenJDK Runtime Environment (build 17.0.8+7-Debian-1deb12u1)
>Reporter: Roland Sommer
>Priority: Major
> Attachments: broker.properties, controller.properties
>
>
> We recently did some migration testing from our old ZK-based kafka clusters 
> to KRaft while still being on kafka 3.4. The migration tests succeeded at 
> first try. In the meantime we updated to kafka 3.5/3.5.1 and now we wanted to 
> continue our migration work, which ran into unexpected problems.
> On the controller we get messages like:
> {code:java}
> Aug 10 06:49:33 kafkactl01 kafka-server-start.sh[48572]: [2023-08-10 
> 06:49:33,072] INFO [KRaftMigrationDriver id=495] Still waiting for all 
> controller nodes ready to begin the migration. due to: Missing apiVersion 
> from nodes: [514, 760] 
> (org.apache.kafka.metadata.migration.KRaftMigrationDriver){code}
> On the broker side, we see:
> {code:java}
> 06:52:56,109] INFO [BrokerLifecycleManager id=6 isZkBroker=true] Unable to 
> register the broker because the RPC got timed out before it could be sent. 
> (kafka.server.BrokerLifecycleManager){code}
> If we reinstall the same development cluster with kafka 3.4, using the exact 
> same steps provided by your migration documentation (only difference is using 
> {{inter.broker.protocol.version=3.4}} instead of 
> {{{}inter.broker.protocol.version=3.5{}}}), everything works as expected. 
> Updating to kafka 3.5/3.5.1 yields the same problems.
> Testing is done on a three-node kafka cluster with a three-node zookeeper 
> ensemble and a three-node controller setup.
> Besides our default configuration containing the active zookeeper hosts etc., 
> this is what was added on the brokers:
> {code:java}
> # Migration
> advertised.listeners=PLAINTEXT://kafka03:9092
> listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
> zookeeper.metadata.migration.enable=true
> controller.quorum.voters=495@kafkactl01:9093,760@kafkactl02:9093,514@kafkactl03:9093
> controller.listener.names=CONTROLLER
> {code}
> The main controller config looks like this:
> {code:java}
> process.roles=controller
> node.id=495
> controller.quorum.voters=495@kafkactl01:9093,760@kafkactl02:9093,514@kafkactl03:9093
> listeners=CONTROLLER://:9093
> inter.broker.listener.name=PLAINTEXT
> controller.listener.names=CONTROLLER
> listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
> zookeeper.metadata.migration.enable=true
> {code}
> Both configs contain the identical {{zookeeper.connect}} settings, everything 
> is setup automatically so it should be identical on every run and we can 
> reliably reproduce migration success on kafka 3.4 and migration failure using 
> the same setup with kafka 3.5.
> There are other issues mentioning problems with ApiVersions like KAFKA-15230 
> - not quite sure if this is a duplicate of the underlying problem there.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16883) Zookeeper-Kraft failing migration - RPC got timed out before it could be sent

2024-07-29 Thread Nicolas Henneaux (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicolas Henneaux resolved KAFKA-16883.
--
Fix Version/s: 3.7.1
   Resolution: Fixed

I re-executed the migration on v3.7.1 and now it works!

> Zookeeper-Kraft failing migration - RPC got timed out before it could be sent
> -
>
> Key: KAFKA-16883
> URL: https://issues.apache.org/jira/browse/KAFKA-16883
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Affects Versions: 3.7.0, 3.6.1, 3.6.2
>Reporter: Nicolas Henneaux
>Priority: Major
> Fix For: 3.7.1
>
>
> Despite several attempts to migrate from Zookeeper cluster to Kraft, it 
> failed to properly migrate.
> We spawn a need cluster fully healthy with 3 Kafka nodes connected to 3 
> Zookeeper nodes. 3 new Kafka nodes are there for the new controllers.
> It was tested with Kafka 3.6.1, 3.6.2 and 3.7.0.
> it might be linked to KAFKA-15330.
> The controllers are started without issue. When the brokers are then 
> configured for the migration, the migration is not starting. Once the last 
> broker is restarted, we got the following logs.
> {code:java}
> [2024-06-03 15:11:48,192] INFO [ReplicaFetcherThread-0-11]: Stopped 
> (kafka.server.ReplicaFetcherThread)
> [2024-06-03 15:11:48,193] INFO [ReplicaFetcherThread-0-11]: Shutdown 
> completed (kafka.server.ReplicaFetcherThread)
> {code}
> Then we only get the following every 30s
> {code:java}
> [2024-06-03 15:12:04,163] INFO [BrokerLifecycleManager id=12 isZkBroker=true] 
> Unable to register the broker because the RPC got timed out before it could 
> be sent. (kafka.server.BrokerLifecycleManager)
> [2024-06-03 15:12:34,297] INFO [BrokerLifecycleManager id=12 isZkBroker=true] 
> Unable to register the broker because the RPC got timed out before it could 
> be sent. (kafka.server.BrokerLifecycleManager)
> [2024-06-03 15:13:04,536] INFO [BrokerLifecycleManager id=12 isZkBroker=true] 
> Unable to register the broker because the RPC got timed out before it could 
> be sent. (kafka.server.BrokerLifecycleManager){code}
> The config on the controller node is the following
> {code:java}
> kafka0202e1 ~]$  sudo grep -v '^\s*$\|^\s*\#' /etc/kafka/server.properties  | 
> grep -v password | sort
> advertised.host.name=kafka0202e1.ahub.sb.eu.ginfra.net
> broker.rack=e1
> controller.listener.names=CONTROLLER
> controller.quorum.voters=2...@kafka0202e1.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e2.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e3.ahub.sb.eu.ginfra.net:9093
> default.replication.factor=3
> delete.topic.enable=false
> group.initial.rebalance.delay.ms=3000
> inter.broker.protocol.version=3.7
> listeners=CONTROLLER://kafka0202e1.ahub.sb.eu.ginfra.net:9093
> listener.security.protocol.map=CONTROLLER:SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
> log.dirs=/data/kafka
> log.message.format.version=3.6
> log.retention.check.interval.ms=30
> log.retention.hours=240
> log.segment.bytes=1073741824
> min.insync.replicas=2
> node.id=20
> num.io.threads=8
> num.network.threads=3
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> offsets.topic.replication.factor=3
> process.roles=controller
> security.inter.broker.protocol=SSL
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> socket.send.buffer.bytes=102400
> ssl.cipher.suites=TLS_AES_256_GCM_SHA384
> ssl.client.auth=required
> ssl.enabled.protocols=TLSv1.3
> ssl.endpoint.identification.algorithm=HTTPS
> ssl.keystore.location=/etc/kafka/ssl/keystore.ts
> ssl.keystore.type=JKS
> ssl.secure.random.implementation=SHA1PRNG
> ssl.truststore.location=/etc/kafka/ssl/truststore.ts
> transaction.state.log.min.isr=3
> transaction.state.log.replication.factor=3
> unclean.leader.election.enable=false
> zookeeper.connect=10.135.65.199:2181,10.133.65.199:2181,10.137.64.56:2181,
> zookeeper.metadata.migration.enable=true
>  {code}
> The config on the broker node is the following
> {code}
> $ sudo grep -v '^\s*$\|^\s*\#' /etc/kafka/server.properties  | grep -v 
> password | sort
> advertised.host.name=kafka0201e3.ahub.sb.eu.ginfra.net
> advertised.listeners=SSL://kafka0201e3.ahub.sb.eu.ginfra.net:9092
> broker.id=12
> broker.rack=e3
> controller.listener.names=CONTROLLER # added once all controllers were started
> controller.quorum.voters=2...@kafka0202e1.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e2.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e3.ahub.sb.eu.ginfra.net:9093
>  # added once

[jira] [Commented] (KAFKA-17033) Consider replacing the type of the ReplicaKey directory id field.

2024-07-28 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-17033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869192#comment-17869192
 ] 

José Armando García Sancio commented on KAFKA-17033:


[~frankvicky] This Jira is targeting ReplicaKey. We can extend the scope of 
this Jira and PR to MetaProperties if we find it useful there. But I am not 
sure if that is the case.

> Consider replacing the type of the ReplicaKey directory id field.
> -
>
> Key: KAFKA-17033
> URL: https://issues.apache.org/jira/browse/KAFKA-17033
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: José Armando García Sancio
>Priority: Major
>
> The current type for directoryId field in ReplicaKey is Optional. The 
> field is Optional.empty when the directory id is Uuuid.ZERO_UUID. This nice 
> because if forces KRaft to handle the empty care differently.
> The issue with this modeling is that anytime KRaft needs to serialize the 
> directory it does an explicit conversion to the zero uuid when the option is 
> empty.
> It is possible that adding a type like DirectoryId could improve the user 
> experience when dealing with the directory uuid.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17033) Consider replacing the type of the ReplicaKey directory id field.

2024-07-28 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-17033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-17033:
---
Description: 
The current type for directoryId field in ReplicaKey is Optional. The 
field is Optional.empty when the directory id is Uuuid.ZERO_UUID. This nice 
because if forces KRaft to handle the empty care differently.

The issue with this modeling is that anytime KRaft needs to serialize the 
directory it does an explicit conversion to the zero uuid when the option is 
empty.

It is possible that adding a type like DirectoryId could improve the user 
experience when dealing with the directory uuid.

  was:One way to handle this is to introduce a type called `DirectoryId` that 
just encapsulates a Uuid but it is able to better handle the Uuid.ZERO_UUID 
case.


> Consider replacing the type of the ReplicaKey directory id field.
> -
>
> Key: KAFKA-17033
> URL: https://issues.apache.org/jira/browse/KAFKA-17033
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: José Armando García Sancio
>Priority: Major
>
> The current type for directoryId field in ReplicaKey is Optional. The 
> field is Optional.empty when the directory id is Uuuid.ZERO_UUID. This nice 
> because if forces KRaft to handle the empty care differently.
> The issue with this modeling is that anytime KRaft needs to serialize the 
> directory it does an explicit conversion to the zero uuid when the option is 
> empty.
> It is possible that adding a type like DirectoryId could improve the user 
> experience when dealing with the directory uuid.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17033) Consider replacing the type of the ReplicaKey directory id field.

2024-07-28 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-17033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-17033:
---
Summary: Consider replacing the type of the ReplicaKey directory id field.  
(was: Consider replacing the ReplicaKey Optional with just Uuid)

> Consider replacing the type of the ReplicaKey directory id field.
> -
>
> Key: KAFKA-17033
> URL: https://issues.apache.org/jira/browse/KAFKA-17033
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: José Armando García Sancio
>Priority: Major
>
> One way to handle this is to introduce a type called `DirectoryId` that just 
> encapsulates a Uuid but it is able to better handle the Uuid.ZERO_UUID case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17033) Consider replacing the ReplicaKey Optional with just Uuid

2024-07-28 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-17033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio updated KAFKA-17033:
---
Summary: Consider replacing the ReplicaKey Optional with just Uuid  
(was: Consider replacing the directory id Optional with just Uuid)

> Consider replacing the ReplicaKey Optional with just Uuid
> ---
>
> Key: KAFKA-17033
> URL: https://issues.apache.org/jira/browse/KAFKA-17033
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: José Armando García Sancio
>Priority: Major
>
> One way to handle this is to introduce a type called `DirectoryId` that just 
> encapsulates a Uuid but it is able to better handle the Uuid.ZERO_UUID case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16915) Update leader change message

2024-07-28 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-16915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

José Armando García Sancio resolved KAFKA-16915.

  Assignee: Alyssa Huang
Resolution: Fixed

> Update leader change message
> 
>
> Key: KAFKA-16915
> URL: https://issues.apache.org/jira/browse/KAFKA-16915
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: José Armando García Sancio
>Assignee: Alyssa Huang
>Priority: Major
> Fix For: 3.9.0
>
>
> It would be good to include the schema changes as part of 3.9 event if they 
> are not populated. It would make it compatible with kraft.version 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16701) Some SocketServerTest buffered close tests flaky failing locally

2024-07-28 Thread bboyleonp (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869167#comment-17869167
 ] 

bboyleonp edited comment on KAFKA-16701 at 7/28/24 12:23 PM:
-

[~gharris1727] I have added custom logs to narrow down the issue and find the 
following information

 

Log for test `closingChannelSendFailure` on JDK 17 ({color:#de350b}Fail{color})
{code:java}
[2024-07-28 19:46:31,751] DEBUG Leon: before receiveRequest (kafka:700)
[2024-07-28 19:46:31,751] DEBUG Leon: request obtained by callbackQueue is null 
(kafka.request.logger:471)
[2024-07-28 19:46:31,751] DEBUG Leon: the connectionQuotas of inetAddress is 1 
(kafka.network.SocketServerTest$TestableProcessor:62)
[2024-07-28 19:46:32,053] DEBUG Leon: the connectionQuotas of inetAddress is 0 
(kafka.network.SocketServerTest$TestableProcessor:62)
[2024-07-28 19:46:32,053] ERROR Exception while processing disconnection of 
127.0.0.1:51325-127.0.0.1:51327-0 
(kafka.network.SocketServerTest$TestableProcessor:76)
java.lang.IllegalArgumentException: Attempted to decrease connection count for 
address with no connections, address: /127.0.0.1
at 
kafka.network.ConnectionQuotas.$anonfun$dec$1(SocketServer.scala:1535)
at scala.collection.mutable.HashMap.getOrElse(HashMap.scala:451)
at kafka.network.ConnectionQuotas.dec(SocketServer.scala:1535)
at 
kafka.network.Processor.$anonfun$processDisconnected$1(SocketServer.scala:1225)
at java.base/java.util.HashMap$KeySet.forEach(HashMap.java:1008)
at kafka.network.Processor.processDisconnected(SocketServer.scala:1216)
at kafka.network.Processor.run(SocketServer.scala:1019)
at java.base/java.lang.Thread.run(Thread.java:840)
[2024-07-28 19:46:41,755] DEBUG Leon: request obtained by requestQueue is null 
(kafka.request.logger:476)
[2024-07-28 19:46:41,755] DEBUG Leon: before finally::proxyServer.close 
(kafka:703) {code}
 

Log for test `closingChannelSendFailure` on JDK 11 
({color:#00875a}Success{color})
{code:java}
[2024-07-28 19:24:48,265] DEBUG Leon: before receiveRequest (kafka:700)
[2024-07-28 19:24:48,265] DEBUG Leon: request obtained by callbackQueue is null 
(kafka.request.logger:471)
[2024-07-28 19:24:48,265] DEBUG Completed 
request:{"isForwarded":false,"requestHeader":{"requestApiKey":0,"requestApiVersion":11,"correlationId":-1,"clientId":"","requestApiKeyName":"PRODUCE"},"request":{"transactionalId":null,"acks":0,"timeoutMs":1,"topicData":[]},"response":"","connection":"127.0.0.1:50229-127.0.0.1:50231-0","totalTimeMs":104.595,"requestQueueTimeMs":0.0,"localTimeMs":3.2080946725E7,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.316,"sendTimeMs":0.03,"securityProtocol":"SSL","principal":"User:ANONYMOUS","listener":"SSL","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}}
 (kafka.request.logger:279)
[2024-07-28 19:24:48,265] TRACE Socket server received empty response to send, 
registering for read: Response(type=NoOp, request=Request(processor=0, 
connectionId=127.0.0.1:50229-127.0.0.1:50231-0, 
session=org.apache.kafka.network.Session@242ff747, 
listenerName=ListenerName(SSL), securityProtocol=SSL, 
buffer=java.nio.HeapByteBuffer[pos=20 lim=20 cap=20], envelope=None)) 
(kafka.network.SocketServerTest$TestableProcessor:54)
[2024-07-28 19:24:48,266] TRACE Processor 0 received request: 
RequestHeader(apiKey=PRODUCE, apiVersion=11, clientId=, correlationId=-1, 
headerVersion=2) -- {acks=0,timeout=1,partitionSizes=[]} 
(kafka.network.RequestChannel$:45)
[2024-07-28 19:24:48,266] DEBUG Leon: request obtained by requestQueue is 
Request(processor=0, connectionId=127.0.0.1:50229-127.0.0.1:50231-0, 
session=org.apache.kafka.network.Session@5329f6b3, 
listenerName=ListenerName(SSL), securityProtocol=SSL, 
buffer=java.nio.HeapByteBuffer[pos=20 lim=20 cap=20], envelope=None) 
(kafka.request.logger:476)
[2024-07-28 19:24:48,267] DEBUG Leon: before finally::proxyServer.close 
(kafka:703)
[2024-07-28 19:24:48,267] DEBUG Leon: before sslConnect (kafka:1469)
[2024-07-28 19:24:48,268] DEBUG Leon: before sendRequest (kafka:1471) {code}
 

This test will call `receiveRequest` in _RequestChannel.scala_ to poll for 
`callbackQueue` and `requestQueue`.

I found a daemon in _SocketServer.scala_ that will monitor and count the valid 
connections which is maintained by `connectionQuotas`. It seems that the 
connections in JDK 17 are disconnected unexpectedly. *Please find the 2 lines 
marked in red.*

*Could you help to verify if you can find the same behav

[jira] [Commented] (KAFKA-16701) Some SocketServerTest buffered close tests flaky failing locally

2024-07-28 Thread bboyleonp (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869167#comment-17869167
 ] 

bboyleonp commented on KAFKA-16701:
---

[~gharris1727] I have added custom logs to narrow down the issue and find the 
following information

 

Log for test `closingChannelSendFailure` on JDK 17 ({color:#de350b}Fail{color})

 
{code:java}
[2024-07-28 19:46:31,751] DEBUG Leon: before receiveRequest (kafka:700)
[2024-07-28 19:46:31,751] DEBUG Leon: request obtained by callbackQueue is null 
(kafka.request.logger:471)
[2024-07-28 19:46:31,751] DEBUG Leon: the connectionQuotas of inetAddress is 1 
(kafka.network.SocketServerTest$TestableProcessor:62)
[2024-07-28 19:46:32,053] DEBUG Leon: the connectionQuotas of inetAddress is 0 
(kafka.network.SocketServerTest$TestableProcessor:62)
[2024-07-28 19:46:32,053] ERROR Exception while processing disconnection of 
127.0.0.1:51325-127.0.0.1:51327-0 
(kafka.network.SocketServerTest$TestableProcessor:76)
java.lang.IllegalArgumentException: Attempted to decrease connection count for 
address with no connections, address: /127.0.0.1
at 
kafka.network.ConnectionQuotas.$anonfun$dec$1(SocketServer.scala:1535)
at scala.collection.mutable.HashMap.getOrElse(HashMap.scala:451)
at kafka.network.ConnectionQuotas.dec(SocketServer.scala:1535)
at 
kafka.network.Processor.$anonfun$processDisconnected$1(SocketServer.scala:1225)
at java.base/java.util.HashMap$KeySet.forEach(HashMap.java:1008)
at kafka.network.Processor.processDisconnected(SocketServer.scala:1216)
at kafka.network.Processor.run(SocketServer.scala:1019)
at java.base/java.lang.Thread.run(Thread.java:840)
[2024-07-28 19:46:41,755] DEBUG Leon: request obtained by requestQueue is null 
(kafka.request.logger:476)
[2024-07-28 19:46:41,755] DEBUG Leon: before finally::proxyServer.close 
(kafka:703) {code}
 

 

Log for test `closingChannelSendFailure` on JDK 11 
({color:#00875a}Success{color})

 
{code:java}
[2024-07-28 19:24:48,265] DEBUG Leon: before receiveRequest (kafka:700)
[2024-07-28 19:24:48,265] DEBUG Leon: request obtained by callbackQueue is null 
(kafka.request.logger:471)
[2024-07-28 19:24:48,265] DEBUG Completed 
request:{"isForwarded":false,"requestHeader":{"requestApiKey":0,"requestApiVersion":11,"correlationId":-1,"clientId":"","requestApiKeyName":"PRODUCE"},"request":{"transactionalId":null,"acks":0,"timeoutMs":1,"topicData":[]},"response":"","connection":"127.0.0.1:50229-127.0.0.1:50231-0","totalTimeMs":104.595,"requestQueueTimeMs":0.0,"localTimeMs":3.2080946725E7,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.316,"sendTimeMs":0.03,"securityProtocol":"SSL","principal":"User:ANONYMOUS","listener":"SSL","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}}
 (kafka.request.logger:279)
[2024-07-28 19:24:48,265] TRACE Socket server received empty response to send, 
registering for read: Response(type=NoOp, request=Request(processor=0, 
connectionId=127.0.0.1:50229-127.0.0.1:50231-0, 
session=org.apache.kafka.network.Session@242ff747, 
listenerName=ListenerName(SSL), securityProtocol=SSL, 
buffer=java.nio.HeapByteBuffer[pos=20 lim=20 cap=20], envelope=None)) 
(kafka.network.SocketServerTest$TestableProcessor:54)
[2024-07-28 19:24:48,266] TRACE Processor 0 received request: 
RequestHeader(apiKey=PRODUCE, apiVersion=11, clientId=, correlationId=-1, 
headerVersion=2) -- {acks=0,timeout=1,partitionSizes=[]} 
(kafka.network.RequestChannel$:45)
[2024-07-28 19:24:48,266] DEBUG Leon: request obtained by requestQueue is 
Request(processor=0, connectionId=127.0.0.1:50229-127.0.0.1:50231-0, 
session=org.apache.kafka.network.Session@5329f6b3, 
listenerName=ListenerName(SSL), securityProtocol=SSL, 
buffer=java.nio.HeapByteBuffer[pos=20 lim=20 cap=20], envelope=None) 
(kafka.request.logger:476)
[2024-07-28 19:24:48,267] DEBUG Leon: before finally::proxyServer.close 
(kafka:703)
[2024-07-28 19:24:48,267] DEBUG Leon: before sslConnect (kafka:1469)
[2024-07-28 19:24:48,268] DEBUG Leon: before sendRequest (kafka:1471) {code}
 

 

This test will call `receiveRequest` in _RequestChannel.scala_ to poll for 
`callbackQueue` and `requestQueue`.

I found a daemon in _SocketServer.scala_ that will monitor and count the valid 
connections which is maintained by `connectionQuotas`. It seems that the 
connections in JDK 17 are disconnected unexpectedly. *Please find the 2 lines 
marked in red.*

*Could you help to verify if you can find the same behavior on your environment 
as well?* Her

[jira] [Commented] (KAFKA-13906) Invalid replica state transition

2024-07-28 Thread Gaurav Narula (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869164#comment-17869164
 ] 

Gaurav Narula commented on KAFKA-13906:
---

Hi [~johnnyhsu] I wanted to check if you're still working on this? Please feel 
free to assign it to me otherwise as I think I may have a workaround.

> Invalid replica state transition
> 
>
> Key: KAFKA-13906
> URL: https://issues.apache.org/jira/browse/KAFKA-13906
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core, replication
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, 
> 3.2.1
>Reporter: Igor Soarez
>Assignee: Johnny Hsu
>Priority: Major
>  Labels: BUG, controller, replication, reproducible-bug
>
> The controller runs into an IllegalStateException when reacting to changes in 
> broker membership status if there are topics that are pending deletion.
>  
> How to reproduce:
>  # Setup cluster with 3 brokers
>  # Create a topic with a partition being led by each broker and produce some 
> data
>  # Kill one of the brokers that is not the controller, and keep that broker 
> down
>  # Delete the topic
>  # Restart the other broker that is not the controller
>  
> Logs and stacktrace:
> {{[2022-05-16 11:53:25,482] ERROR [Controller id=1 epoch=1] Controller 1 
> epoch 1 initiated state change of replica 3 for partition test-topic-2 from 
> ReplicaDeletionSuccessful to ReplicaDeletionIneligible failed 
> (state.change.logger)}}
> {{java.lang.IllegalStateException: Replica 
> [Topic=test-topic,Partition=2,Replica=3] should be in the 
> OfflineReplica,ReplicaDeletionStarted states before moving to 
> ReplicaDeletionIneligible state. Instead it is in ReplicaDeletionSuccessful 
> state}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}}
> {{        at scala.collection.immutable.List.foreach(List.scala:333)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}}
> {{        at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}}
> {{        at 
> scala.collection.immutable.HashMap.foreachEntry(HashMap.scala:1092)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:111)}}
> {{        at 
> kafka.controller.TopicDeletionManager.failReplicaDeletion(TopicDeletionManager.scala:157)}}
> {{        at 
> kafka.controller.KafkaController.onReplicasBecomeOffline(KafkaController.scala:638)}}
> {{        at 
> kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:599)}}
> {{        at 
> kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1623)}}
> {{        at 
> kafka.controller.KafkaController.process(KafkaController.scala:2534)}}
> {{        at 
> kafka.controller.QueuedEvent.process(ControllerEventManager.scala:52)}}
> {{        at 
> kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:130)}}
> {{--}}
> {{[2022-05-16 11:53:40,726] ERROR [Controller id=1 epoch=1] Controller 1 
> epoch 1 initiated state change of replica 3 for partition test-topic-2 from 
> ReplicaDeletionSuccessful to OnlineReplica failed (state.change.logger)}}
> {{java.lang.IllegalStateException: Replica 
> [Topic=test-topic,Partition=2,Replica=3] should be in the 
> NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible states 
> before moving to OnlineReplica state. Instead it is in 
> ReplicaDeletionSuccessful state}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}}
> {{        at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}}
> {{        at scala.collection.immutable.List.foreach(List.scala:333)}}
> {{        at 
> kafka

[jira] [Commented] (KAFKA-17206) Use v1 of LeaderChangeMessage when kraft.version is 1

2024-07-28 Thread Meisam Jafari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869152#comment-17869152
 ] 

Meisam Jafari commented on KAFKA-17206:
---

Hi Alyssa Huang, Could I take this issue??

> Use v1 of LeaderChangeMessage when kraft.version is 1
> -
>
> Key: KAFKA-17206
> URL: https://issues.apache.org/jira/browse/KAFKA-17206
> Project: Kafka
>  Issue Type: Task
>Reporter: Alyssa Huang
>Priority: Minor
>
> [https://github.com/apache/kafka/pull/16668] introduced v1 of LCM but still 
> uses v0 of the schema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16701) Some SocketServerTest buffered close tests flaky failing locally

2024-07-28 Thread bboyleonp (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869149#comment-17869149
 ] 

bboyleonp commented on KAFKA-16701:
---

[~gharris1727] I agree that we should keep investigating this issue.

It seems the inter-test interaction that causes the case 
`closingChannelSendFailure` execution in between `processDisconnectedException` 
i{*}s not reproducible{*} on my environment anymore, which aligns our previous 
thoughts. I am going to ignore the previous finding on this one and try to 
figure out if there's some other potential flaw.

 

>  It looks like the full suite almost always fails, and tests on their own 
> almost always pass

I can find the same. It seems the problem does not lie in the test cases 
themselves, but the whole suite execution. I am going to investigate towards 
this direction to see if I can find any clues.

 

> Opening and closing sockets will inherently change the state of the process, 
> and so its always possible that sockets from one test are being picked up by 
> another. 

That is a great point. I print out the PID and TID of 
`processDisconnectedException` and `closingChannelSendFailure` for testing 
purpose and they are both using the same PID and TID. There's a chance that 
this is raised by resources reuse.

 

I will update if I have any new findings.

> Some SocketServerTest buffered close tests flaky failing locally
> 
>
> Key: KAFKA-16701
> URL: https://issues.apache.org/jira/browse/KAFKA-16701
> Project: Kafka
>  Issue Type: Test
>  Components: core, unit tests
>Affects Versions: 3.5.0, 3.6.0, 3.7.0
>Reporter: Greg Harris
>Assignee: bboyleonp
>Priority: Major
>  Labels: flaky-test
>
> These tests are failing for me on a local development environment, but don't 
> appear to be flaky or failing in CI. They only appear to fail for JDK >= 17. 
> I'm using an M1 Mac, so it is possible that either the Mac's linear port 
> allocation, or a native implementation is impacting this.
> closingChannelSendFailure()
>  
> {noformat}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
>   at 
> kafka.network.SocketServerTest.makeChannelWithBufferedRequestsAndCloseRemote(SocketServerTest.scala:690)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifySendFailureAfterRemoteClose$1(SocketServerTest.scala:1434)
>   at 
> kafka.network.SocketServerTest.verifySendFailureAfterRemoteClose(SocketServerTest.scala:1430)
>   at 
> kafka.network.SocketServerTest.closingChannelSendFailure(SocketServerTest.scala:1425){noformat}
> closingChannelWithBufferedReceivesFailedSend()
>  
> {noformat}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553)
>   at 
> kafka.network.SocketServerTest.closingChannelWithBufferedReceivesFailedSend(SocketServerTest.scala:1520){noformat}
> closingChannelWithCompleteAndIncompleteBufferedReceives()
> {noformat}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591)
>   at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590)
>   at 
> kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553)
>   at 
> kafka.network.SocketServerTest.closingChannelWithCompleteAndIncompleteBufferedReceives(SocketServerTest.scala:1511)
>  {noformat}
> remoteCloseWithBufferedReceives()
> {noformat}
> java.lang.AssertionError: receiveRequest timed out
>   at 
> kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148)
>   at 
> kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(S

[jira] [Updated] (KAFKA-17209) Revisit testCurrentLag for AsyncKafkaConsumer

2024-07-27 Thread TaiJuWu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

TaiJuWu updated KAFKA-17209:

External issue URL: https://github.com/apache/kafka/pull/16703

> Revisit testCurrentLag for AsyncKafkaConsumer
> -
>
> Key: KAFKA-17209
> URL: https://issues.apache.org/jira/browse/KAFKA-17209
> Project: Kafka
>  Issue Type: Test
>Reporter: TaiJuWu
>Assignee: TaiJuWu
>Priority: Major
>
> The difference between {{ClassicConsumer}} and {{AsyncConsumer}} is 
> ClassicConsumer is single thread and AsyncConsumer is two thread, one is app 
> thread, the other is background thread.
> {{Fetch}} Request is generated by background thread and {{ListOffset}} 
> Request is generated by application thread.
> This will make {{testCurrentLag}} fail if AsyncConsumer generated {{Fetch 
> Request}} first.
> So we support out of order feature for {{MockClient}} to resolve issue.
> Another difference between asyncConsumer and ClassicConsumer is front do 
> generate additional {{FetchOffset}} Request from CommitRequestManager but 
> ClassicConsumer does not have the request.
> The reason is coordinator is not ready during test for {{ClassicConsumer}} so 
> we can ignore this request for {{AsyncConsumer.}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17209) Revisit testCurrentLag for AsyncKafkaConsumer

2024-07-27 Thread TaiJuWu (Jira)
TaiJuWu created KAFKA-17209:
---

 Summary: Revisit testCurrentLag for AsyncKafkaConsumer
 Key: KAFKA-17209
 URL: https://issues.apache.org/jira/browse/KAFKA-17209
 Project: Kafka
  Issue Type: Test
Reporter: TaiJuWu
Assignee: TaiJuWu


The difference between {{ClassicConsumer}} and {{AsyncConsumer}} is 
ClassicConsumer is single thread and AsyncConsumer is two thread, one is app 
thread, the other is background thread.

{{Fetch}} Request is generated by background thread and {{ListOffset}} Request 
is generated by application thread.
This will make {{testCurrentLag}} fail if AsyncConsumer generated {{Fetch 
Request}} first.
So we support out of order feature for {{MockClient}} to resolve issue.

Another difference between asyncConsumer and ClassicConsumer is front do 
generate additional {{FetchOffset}} Request from CommitRequestManager but 
ClassicConsumer does not have the request.

The reason is coordinator is not ready during test for {{ClassicConsumer}} so 
we can ignore this request for {{AsyncConsumer.}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-8366) partitions of topics being deleted show up in the offline partitions metric

2024-07-27 Thread Gaurav Narula (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gaurav Narula reassigned KAFKA-8366:


Assignee: Gaurav Narula

> partitions of topics being deleted show up in the offline partitions metric
> ---
>
> Key: KAFKA-8366
> URL: https://issues.apache.org/jira/browse/KAFKA-8366
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, core, metrics
>Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.2.1
>Reporter: Radai Rosenblatt
>Assignee: Gaurav Narula
>Priority: Major
>  Labels: BUG, controller, metrics, reproducible-bug
>
> i believe this is a bug
> offline partitions is a metric that indicates an error condition - lack of 
> kafka availability.
> as an artifact of how deletion is implemented the partitions for a topic 
> undergoing deletion will show up as offline, which just creates 
> false-positive alerts.
> if needed, maybe there should exist a separate "partitions to be deleted" 
> sensor.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-17186) Cannot receive message after stopping Source Mirror Maker 2

2024-07-26 Thread George Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869062#comment-17869062
 ] 

George Yang edited comment on KAFKA-17186 at 7/27/24 2:45 AM:
--

Great! That solution works well. How did you determine that 'distributed' 
should be removed as a prefix? From what I understand, we typically set 
parameters like 'producer', 'consumer', and 'admin' in the following format:
 * {{{source}.consumer.\{consumer_config_name}}}
 * {{{target}.producer.\{producer_config_name}}}
 * {{{source_or_target}.admin.\{admin_config_name}}}

These conventions are recommended by the Kafka official documentation. However, 
for custom Kafka Connect settings such as DistributedConfig, 
JsonConverterConfig, ConnectorConfig, SourceConnectorConfig, 
EnrichedConnectorConfig, MirrorCheckpointConfig, MirrorHeartbeatConfig, 
MirrorCheckpointTaskConfig, and others, can I simply omit '\{someConfig}' and 
set them using '\{alias}.\{some_config_name}'?


was (Author: JIRAUSER302264):
Great! That solution works well. How did you determine that 'distributed' 
should be removed as a prefix? From what I understand, we typically set 
parameters like 'producer', 'consumer', and 'admin' in the following format:
 * {{{source}.consumer.\{consumer_config_name}}}
 * {{{target}.producer.\{producer_config_name}}}
 * {{{source_or_target}.admin.\{admin_config_name}}}

These conventions are recommended by the Kafka official documentation. However, 
for custom Kafka Connect settings such as DistributedConfig, 
JsonConverterConfig, ConnectorConfig, SourceConnectorConfig, 
EnrichedConnectorConfig, MirrorCheckpointConfig, MirrorHeartbeatConfig, 
MirrorCheckpointTaskConfig, and others, can I simply omit '\{someConfig}' and 
set them using '\{alias}.\{some_config_name}'?

> Cannot receive message after stopping Source Mirror Maker 2
> ---
>
> Key: KAFKA-17186
> URL: https://issues.apache.org/jira/browse/KAFKA-17186
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.1
> Environment: Source Kafka Cluster per Node:
> CPU(s): 32
> Memory: 32G/1.1G free
> Target Kafka Cluster standalone Node:
> CPU(s): 24
> Memory: 30G/909M free
> Kafka Version 3.7
> Mirrormaker Version 3.7.1
>Reporter: George Yang
>Priority: Major
> Attachments: image-2024-07-25-14-14-21-327.png, mirrorMaker.out
>
>
> Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node 
> 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. 
> Currently, a service on node 1 in Data Center A acts as a producer sending 
> messages to the `myTest` topic. A service in Data Center B acts as a consumer 
> listening to `A.myTest`. 
> The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer 
> in Data Center B ceases to receive messages. Even after I restarting MM2 in 
> Data Center A, the consumer in Data Center B still does not receive messages 
> until approximately 5 minutes later when a rebalance occurs, at which point 
> it begins receiving messages again.
>  
> [Logs From Consumer on Data Center B]
> [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
> [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
> (Re-)joining group 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
> [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Successfully joined group with generation Generation\{generationId=52, 
> memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Successfully synced group in generation Generation\{generationId=52, 
> memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sess

[jira] [Commented] (KAFKA-17186) Cannot receive message after stopping Source Mirror Maker 2

2024-07-26 Thread George Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869062#comment-17869062
 ] 

George Yang commented on KAFKA-17186:
-

Great! That solution works well. How did you determine that 'distributed' 
should be removed as a prefix? From what I understand, we typically set 
parameters like 'producer', 'consumer', and 'admin' in the following format:
 * {{{source}.consumer.\{consumer_config_name}}}
 * {{{target}.producer.\{producer_config_name}}}
 * {{{source_or_target}.admin.\{admin_config_name}}}

These conventions are recommended by the Kafka official documentation. However, 
for custom Kafka Connect settings such as DistributedConfig, 
JsonConverterConfig, ConnectorConfig, SourceConnectorConfig, 
EnrichedConnectorConfig, MirrorCheckpointConfig, MirrorHeartbeatConfig, 
MirrorCheckpointTaskConfig, and others, can I simply omit '\{someConfig}' and 
set them using '\{alias}.\{some_config_name}'?

> Cannot receive message after stopping Source Mirror Maker 2
> ---
>
> Key: KAFKA-17186
> URL: https://issues.apache.org/jira/browse/KAFKA-17186
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.1
> Environment: Source Kafka Cluster per Node:
> CPU(s): 32
> Memory: 32G/1.1G free
> Target Kafka Cluster standalone Node:
> CPU(s): 24
> Memory: 30G/909M free
> Kafka Version 3.7
> Mirrormaker Version 3.7.1
>Reporter: George Yang
>Priority: Major
> Attachments: image-2024-07-25-14-14-21-327.png, mirrorMaker.out
>
>
> Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node 
> 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. 
> Currently, a service on node 1 in Data Center A acts as a producer sending 
> messages to the `myTest` topic. A service in Data Center B acts as a consumer 
> listening to `A.myTest`. 
> The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer 
> in Data Center B ceases to receive messages. Even after I restarting MM2 in 
> Data Center A, the consumer in Data Center B still does not receive messages 
> until approximately 5 minutes later when a rebalance occurs, at which point 
> it begins receiving messages again.
>  
> [Logs From Consumer on Data Center B]
> [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
> [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
> (Re-)joining group 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
> [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Successfully joined group with generation Generation\{generationId=52, 
> memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Successfully synced group in generation Generation\{generationId=52, 
> memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined 
> group at generation 52 with protocol version 2 and got assignment: 
> Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', 
> leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], 
> taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, 
> MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], 
> delay=0} with rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting 
> connectors and tasks using config offset 1360 
> (org.apache.kafka.connect.runtime.distributed.Distribu

[jira] [Commented] (KAFKA-17207) ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking clients

2024-07-26 Thread Xuze Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869050#comment-17869050
 ] 

Xuze Yang commented on KAFKA-17207:
---

Hi [~gharris1727], if you're not working on this, may I take it? Thank you.

> ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking 
> clients
> -
>
> Key: KAFKA-17207
> URL: https://issues.apache.org/jira/browse/KAFKA-17207
> Project: Kafka
>  Issue Type: Test
>  Components: connect
>Affects Versions: 3.9.0
>Reporter: Greg Harris
>Priority: Minor
>  Labels: newbie
>
> The testRequestTimeouts deletes the internal config topic, putting the 
> connect workers into a bad state. When the test goes to clean up, it calls 
> DistributedHerder#stop, which waits for the herder executor to stop. This 
> times out, because the herder executor is blocked closing the 
> KafkaConfigBackingStore's producer. This log message gets printed:
> {noformat}
> [2024-07-26 11:52:50,817] ERROR Executor 
> java.util.concurrent.ThreadPoolExecutor@7ae97a58[Shutting down, pool size = 
> 1, active threads = 1, queued tasks = 0, completed tasks = 0] did not 
> terminate in time (org.apache.kafka.common.utils.ThreadUtils:83){noformat}
> This effectively leaks the Kafka clients for the workers' internal topics, 
> and the herder executor thread. Instead, either the producer should not block 
> indefinitely on a missing topic, or the cluster state should be healed enough 
> for the producer to shutdown cleanly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14830) Illegal state error in transactional producer

2024-07-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14830:
--
Priority: Critical  (was: Major)

> Illegal state error in transactional producer
> -
>
> Key: KAFKA-14830
> URL: https://issues.apache.org/jira/browse/KAFKA-14830
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.1.2
>Reporter: Jason Gustafson
>Assignee: Kirk True
>Priority: Critical
>
> We have seen the following illegal state error in the producer:
> {code:java}
> [Producer clientId=client-id2, transactionalId=transactional-id] Transiting 
> to abortable error state due to 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> topic-0:120027 ms has passed since batch creation
> [Producer clientId=client-id2, transactionalId=transactional-id] Transiting 
> to abortable error state due to 
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> topic-1:120026 ms has passed since batch creation
> [Producer clientId=client-id2, transactionalId=transactional-id] Aborting 
> incomplete transaction
> [Producer clientId=client-id2, transactionalId=transactional-id] Invoking 
> InitProducerId with current producer ID and epoch 
> ProducerIdAndEpoch(producerId=191799, epoch=0) in order to bump the epoch
> [Producer clientId=client-id2, transactionalId=transactional-id] ProducerId 
> set to 191799 with epoch 1
> [Producer clientId=client-id2, transactionalId=transactional-id] Transiting 
> to abortable error state due to 
> org.apache.kafka.common.errors.NetworkException: Disconnected from node 4
> [Producer clientId=client-id2, transactionalId=transactional-id] Transiting 
> to abortable error state due to 
> org.apache.kafka.common.errors.TimeoutException: The request timed out.
> [Producer clientId=client-id2, transactionalId=transactional-id] Uncaught 
> error in request completion:
> java.lang.IllegalStateException: TransactionalId transactional-id: Invalid 
> transition attempted from state READY to state ABORTABLE_ERROR
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1089)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:508)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:734)
>         at 
> org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:739)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:753)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:743)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:695)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:575)
>         at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:562)
>         at java.base/java.lang.Iterable.forEach(Iterable.java:75)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:562)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836)
>         at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
>         at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328)
>         at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243)
>         at java.base/java.lang.Thread.run(Thread.java:829)
>  {code}
> The producer hits timeouts which cause it to abort an active transaction. 
> After aborting, the producer bumps its epoch, which transitions it back to 
> the `READY` state. Following this, there are two errors for inflight 
> requests, which cause an illegal state transition to `ABORTABLE_ERROR`. But 
> how could the transaction ABORT complete if there were still inflight 
> requests? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15402) Performance regression on close consumer after upgrading to 3.5.0

2024-07-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15402:
--
Priority: Critical  (was: Major)

> Performance regression on close consumer after upgrading to 3.5.0
> -
>
> Key: KAFKA-15402
> URL: https://issues.apache.org/jira/browse/KAFKA-15402
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.5.0, 3.6.0, 3.5.1
>Reporter: Benoit Delbosc
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor
> Fix For: 3.9.0
>
> Attachments: image-2023-08-24-18-51-21-720.png, 
> image-2023-08-24-18-51-57-435.png, image-2023-08-25-10-50-28-079.png
>
>
> Hi,
> After upgrading to Kafka client version 3.5.0, we have observed a significant 
> increase in the duration of our Java unit tests. These unit tests heavily 
> rely on the Kafka Admin, Producer, and Consumer API.
> When using Kafka server version 3.4.1, the duration of the unit tests 
> increased from 8 seconds (with Kafka client 3.4.1) to 18 seconds (with Kafka 
> client 3.5.0).
> Upgrading the Kafka server to 3.5.1 show similar results.
> I have come across the issue KAFKA-15178, which could be the culprit. I will 
> attempt to test the proposed patch.
> In the meantime, if you have any ideas that could help identify and address 
> the regression, please let me know.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16444) Run KIP-848 unit tests under code coverage

2024-07-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-16444:
--
Priority: Minor  (was: Major)

> Run KIP-848 unit tests under code coverage
> --
>
> Key: KAFKA-16444
> URL: https://issues.apache.org/jira/browse/KAFKA-16444
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 3.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17182) Consumer's fetch sessions are evicted too quickly

2024-07-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17182:
--
Priority: Critical  (was: Major)

> Consumer's fetch sessions are evicted too quickly
> -
>
> Key: KAFKA-17182
> URL: https://issues.apache.org/jira/browse/KAFKA-17182
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: consumer-threading-refactor
> Fix For: 4.0.0
>
>
> In stress testing the new consumer, the new consumer is evicting fetch 
> sessions on the broker much more frequently than expected. There is an 
> ongoing investigation into this behavior, but it appears to stem from a race 
> condition due to the design of the new consumer.
> In the background thread, fetch requests are sent in a near continuous 
> fashion for partitions that are "fetchable." A timing bug appears to cause 
> partitions to be "unfetchable," which then causes them to end up in the 
> "removed" set of partitions. The broker then removes them from the fetch 
> session, which causes the number of remaining partitions for that session to 
> drop below a threshold that allows it to be evicted by another competing 
> session. Within a few milliseconds, though, the partitions become "fetchable" 
> again, and are added to the "added" set of partitions on the next fetch 
> request. This causes thrashing on both the client and broker sides as both 
> are handling a steady stream of evictions, which negatively affects 
> consumption throughput.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15320) Document event queueing patterns

2024-07-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15320:
-

Assignee: (was: Kirk True)

> Document event queueing patterns
> 
>
> Key: KAFKA-15320
> URL: https://issues.apache.org/jira/browse/KAFKA-15320
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, documentation
>Reporter: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, events
> Fix For: 3.9.0
>
>
> We need to first document the event enqueuing patterns in the 
> PrototypeAsyncConsumer. As part of this task, determine if it’s 
> necessary/beneficial to _conditionally_ add events and/or coalesce any 
> duplicate events in the queue.
> _Don’t forget to include diagrams for clarity!_
> This should be documented on the AK wiki.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17183) New consumer system tests pass for subset of tests, but fail if running all tests

2024-07-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-17183:
--
Priority: Blocker  (was: Major)

> New consumer system tests pass for subset of tests, but fail if running all 
> tests
> -
>
> Key: KAFKA-17183
> URL: https://issues.apache.org/jira/browse/KAFKA-17183
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Blocker
>  Labels: consumer-threading-refactor, system-tests
> Fix For: 3.9.0
>
>
> In 3.8, many of the system tests were updated to exercise both the old 
> consumer and the new consumer. For quicker feedback, I have been creating a 
> YAML-based test suite for just the subset of tests that were updated. When 
> running the system tests in the subset, I am consistently seeing 100% pass 
> rate. Recently I started running the entire test suite, and are now seeing 
> many failures.
> The cause is as yet unknown, but adding this here as a task to fix whatever 
> is causing this behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer

2024-07-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15638:
-

Assignee: (was: Kirk True)

> Investigate ConsumerNetworkThreadTest's testPollResultTimer
> ---
>
> Key: KAFKA-15638
> URL: https://issues.apache.org/jira/browse/KAFKA-15638
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, unit tests
>Reporter: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor, timeout, unit-tests
> Fix For: 3.9.0
>
>
> Regarding this comment in {{{}testPollResultTimer{}}}...
> {code:java}
> // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE 
> upon success|
> {code}
> [~junrao] asked:
> {quote}Which call is returning Long.MAX_VALUE?
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16290) Investigate propagating subscription state updates via queues

2024-07-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16290:
-

Assignee: (was: Kirk True)

> Investigate propagating subscription state updates via queues
> -
>
> Key: KAFKA-16290
> URL: https://issues.apache.org/jira/browse/KAFKA-16290
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Lucas Brutschy
>Priority: Critical
>  Labels: consumer-threading-refactor, kip-848, 
> kip-848-client-support
> Fix For: 3.9.0
>
>
> We are mostly using the queues for interaction between application thread and 
> background thread, but the subscription object is shared between the threads, 
> and it is updated directly without going through the queues. 
> The way we allow updates to the subscription state from both threads is 
> definitely not right, and will bring trouble. Places like the assign() is 
> probably the most obvious, where we send an event to the background to 
> commit, but then update the subscription in the foreground right away.
> It seems sensible to aim for having all updates to the subscription state in 
> the background, triggered from the app thread via events (and I think we 
> already have related events for all updates, just that the subscription state 
> was left out in the app thread).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16109) Write system tests cover the "simple consumer + commit" use case

2024-07-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16109:
-

Assignee: (was: Kirk True)

> Write system tests cover the "simple consumer + commit" use case
> 
>
> Key: KAFKA-16109
> URL: https://issues.apache.org/jira/browse/KAFKA-16109
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer, system tests
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor, system-tests
> Fix For: 4.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16315) Investigate propagating metadata updates via queues

2024-07-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16315:
-

Assignee: (was: Kirk True)

> Investigate propagating metadata updates via queues
> ---
>
> Key: KAFKA-16315
> URL: https://issues.apache.org/jira/browse/KAFKA-16315
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.9.0
>
>
> Some of the new {{AsyncKafkaConsumer}} logic enqueues events for the network 
> I/O thread then issues a call to update the {{ConsumerMetadata}} via 
> {{requestUpdate()}}. If the event ends up stuck in the queue for a long time, 
> it is possible that the metadata is not updated at the correct time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15173) Consumer event queues should be bounded

2024-07-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-15173:
-

Assignee: (was: Kirk True)

> Consumer event queues should be bounded
> ---
>
> Key: KAFKA-15173
> URL: https://issues.apache.org/jira/browse/KAFKA-15173
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Priority: Minor
>  Labels: consumer-threading-refactor, events
> Fix For: 3.9.0
>
>
> The async consumer uses ApplicationEventQueue and BackgroundEventQueue to 
> facilitate message passing between the application thread and the background 
> thread.  The current implementation is boundless, which can potentially cause 
> OOM and other performance-related issues.
> I think the queues need a finite bound, and we need to decide how to handle 
> the situation when the bound is reached.  In particular, I would like to 
> answer these questions:
>  
>  # What should the upper limit be for both queues: Can this be a 
> configurable, memory-based bound? Or just an arbitrary number of events as 
> the bound.
>  # What should happen when the application event queue is filled up?  It 
> seems like we should introduce a new exception type and notify the user that 
> the consumer is full.
>  # What should happen when the background event queue is filled up? This 
> seems less likely to happen, but I imagine it could happen when the user 
> stops polling the consumer, causing the queue to be filled.
>  # Is it necessary to introduce a public configuration for the queue? I think 
> initially we would select an arbitrary constant number and see the community 
> feedback to make a forward plan accordingly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16642) Update KafkaConsumerTest to show parameters in test lists

2024-07-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16642:
-

Assignee: (was: Kirk True)

> Update KafkaConsumerTest to show parameters in test lists
> -
>
> Key: KAFKA-16642
> URL: https://issues.apache.org/jira/browse/KAFKA-16642
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Priority: Minor
>  Labels: consumer-threading-refactor
> Fix For: 3.9.0
>
>
> {{KafkaConsumerTest}} was recently updated to make many of its tests 
> parameterized to exercise both the {{CLASSIC}} and {{CONSUMER}} group 
> protocols. However, in some of the tools in which [lists of tests are 
> provided|https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.clients.consumer.KafkaConsumerTest=FLAKY],
>  say, for analysis, the group protocol information is not exposed. For 
> example, one test ({{{}testReturnRecordsDuringRebalance{}}}) is flaky, but 
> it's difficult to know at a glance which group protocol is causing the 
> problem because the list simply shows:
> {quote}{{testReturnRecordsDuringRebalance(GroupProtocol)[1]}}
> {quote}
> Ideally, it would expose more information, such as:
> {quote}{{testReturnRecordsDuringRebalance(GroupProtocol=CONSUMER)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16966) Allow offset commit fetch to reuse previous request if partitions are a subset

2024-07-26 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True reassigned KAFKA-16966:
-

Assignee: (was: Kirk True)

> Allow offset commit fetch to reuse previous request if partitions are a subset
> --
>
> Key: KAFKA-16966
> URL: https://issues.apache.org/jira/browse/KAFKA-16966
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Priority: Major
>  Labels: consumer-threading-refactor
> Fix For: 3.9.0
>
>
> In {{{}initWithCommittedOffsetsIfNeeded{}}}, the behavior of the existing 
> {{LegacyKafkaConsumer}} is to allow reuse only if the partitions for the 
> _current_ request equal those of the _previous_ request *exactly* 
> ([source|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java?rgh-link-date=2024-06-14T16%3A43%3A11Z#L927]).
>  That logic is the basis for the behavior used in the 
> {{{}AsyncKafkaConsumer{}}}.
> The proposed change is to allow for request reuse if the partitions for the 
> _current_ request are a subset of those of the _previous_ request. This 
> introduces a subtle difference in behavior between the two {{Consumer}} 
> implementations, so we need to decided if we want to change both 
> implementations or just {{{}AsyncKafkaConsumer{}}}. Also, the specific case 
> that the request reuse logic solves is when the user has passed in a very low 
> timeout value in a tight {{poll()}} loop, which suggests the partitions 
> wouldn't be changing between those loops.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17208) replica_scale_test.py fails for new consumer

2024-07-26 Thread Kirk True (Jira)
Kirk True created KAFKA-17208:
-

 Summary: replica_scale_test.py fails for new consumer
 Key: KAFKA-17208
 URL: https://issues.apache.org/jira/browse/KAFKA-17208
 Project: Kafka
  Issue Type: Bug
  Components: clients, consumer, system tests
Affects Versions: 3.8.0
Reporter: Kirk True
Assignee: Kirk True
 Fix For: 4.0.0


{{replica_scale_test}}’s {{test_produce_consume}} fails when using a 
{{group_protocol}} of {{CONSUMER}}:
 
{noformat}
TimeoutError('replicas-consume-workload failed to finish in the expected amount 
of time.')
Traceback (most recent call last):
  File 
"/home/kafka/venv/lib/python3.8/site-packages/ducktape-0.11.1-py3.8.egg/ducktape/tests/runner_client.py",
 line 183, in _do_run
data = self.run_test()
  File 
"/home/kafka/venv/lib/python3.8/site-packages/ducktape-0.11.1-py3.8.egg/ducktape/tests/runner_client.py",
 line 243, in run_test
return self.test_context.function(self.test)
  File 
"/home/kafka/venv/lib/python3.8/site-packages/ducktape-0.11.1-py3.8.egg/ducktape/mark/_mark.py",
 line 433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File "/home/kafka/tests/kafkatest/tests/core/replica_scale_test.py", line 
116, in test_produce_consume
consume_workload.wait_for_done(timeout_sec=600)
  File "/home/kafka/tests/kafkatest/services/trogdor/trogdor.py", line 352, in 
wait_for_done
wait_until(lambda: self.done(),
  File 
"/home/kafka/venv/lib/python3.8/site-packages/ducktape-0.11.1-py3.8.egg/ducktape/utils/util.py",
 line 58, in wait_until
raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from 
last_exception
ducktape.errors.TimeoutError: replicas-consume-workload failed to finish in the 
expected amount of time.
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17202) EosIntegrationTest.verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset leaks consumers

2024-07-26 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-17202:

Priority: Minor  (was: Major)

> EosIntegrationTest.verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset 
> leaks consumers
> --
>
> Key: KAFKA-17202
> URL: https://issues.apache.org/jira/browse/KAFKA-17202
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Affects Versions: 3.9.0
>Reporter: Greg Harris
>Assignee: TengYao Chi
>Priority: Minor
>  Labels: newbie
>
> This method creates a KafkaConsumer, but does not close it.
> We can use a try-with-resources to ensure the consumer is closed prior to 
> returning or throwing from this function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17207) ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking clients

2024-07-26 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-17207:

Affects Version/s: 3.9.0

> ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking 
> clients
> -
>
> Key: KAFKA-17207
> URL: https://issues.apache.org/jira/browse/KAFKA-17207
> Project: Kafka
>  Issue Type: Test
>  Components: connect
>Affects Versions: 3.9.0
>Reporter: Greg Harris
>Priority: Minor
>  Labels: newbie
>
> The testRequestTimeouts deletes the internal config topic, putting the 
> connect workers into a bad state. When the test goes to clean up, it calls 
> DistributedHerder#stop, which waits for the herder executor to stop. This 
> times out, because the herder executor is blocked closing the 
> KafkaConfigBackingStore's producer. This log message gets printed:
> {noformat}
> [2024-07-26 11:52:50,817] ERROR Executor 
> java.util.concurrent.ThreadPoolExecutor@7ae97a58[Shutting down, pool size = 
> 1, active threads = 1, queued tasks = 0, completed tasks = 0] did not 
> terminate in time (org.apache.kafka.common.utils.ThreadUtils:83){noformat}
> This effectively leaks the Kafka clients for the workers' internal topics, 
> and the herder executor thread. Instead, either the producer should not block 
> indefinitely on a missing topic, or the cluster state should be healed enough 
> for the producer to shutdown cleanly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17207) ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking clients

2024-07-26 Thread Greg Harris (Jira)
Greg Harris created KAFKA-17207:
---

 Summary: ConnectWorkerIntegrationTest.testRequestTimeouts times 
out in stop(), leaking clients
 Key: KAFKA-17207
 URL: https://issues.apache.org/jira/browse/KAFKA-17207
 Project: Kafka
  Issue Type: Test
  Components: connect
Reporter: Greg Harris


The testRequestTimeouts deletes the internal config topic, putting the connect 
workers into a bad state. When the test goes to clean up, it calls 
DistributedHerder#stop, which waits for the herder executor to stop. This times 
out, because the herder executor is blocked closing the 
KafkaConfigBackingStore's producer. This log message gets printed:
{noformat}
[2024-07-26 11:52:50,817] ERROR Executor 
java.util.concurrent.ThreadPoolExecutor@7ae97a58[Shutting down, pool size = 1, 
active threads = 1, queued tasks = 0, completed tasks = 0] did not terminate in 
time (org.apache.kafka.common.utils.ThreadUtils:83){noformat}

This effectively leaks the Kafka clients for the workers' internal topics, and 
the herder executor thread. Instead, either the producer should not block 
indefinitely on a missing topic, or the cluster state should be healed enough 
for the producer to shutdown cleanly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17112) StreamThread shutdown calls completeShutdown only in CREATED state

2024-07-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-17112:
---

Assignee: Ao Li

> StreamThread shutdown calls completeShutdown only in CREATED state
> --
>
> Key: KAFKA-17112
> URL: https://issues.apache.org/jira/browse/KAFKA-17112
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 3.9.0
>Reporter: Ao Li
>Assignee: Ao Li
>Priority: Minor
>
> While running tests in `StreamThreadTest.java` in kafka/streams, I noticed 
> the test left many lingering threads. Though the class runs `shutdown` after 
> each test, the shutdown only executes `completeShutdown` if the StreamThread 
> is in CREATED state. See 
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L231]
>  and 
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1435]
>  
> For example, you may run test 
> org.apache.kafka.streams.processor.internals.StreamThreadTest#shouldNotCloseTaskProducerWhenSuspending
>  with commit 0b11971f2c94f7aadc3fab2c51d94642065a72e5. When the test calls 
> `thread.shutdown()`, the thread is in `PARTITIONS_REVOKED` state. Thus, 
> `completeShutdown` is not called. The test creates three lingering threads: 2 
> `StateUpdater` and 1 `TaskExecutor`
>  
> This means that calls to `thread.shutdown` has no effect in 
> `StreamThreadTest.java`. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-17204) KafkaStreamsCloseOptionsIntegrationTest.before leaks AdminClient

2024-07-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-17204.
-
Fix Version/s: 3.9.0
   Resolution: Fixed

> KafkaStreamsCloseOptionsIntegrationTest.before leaks AdminClient
> 
>
> Key: KAFKA-17204
> URL: https://issues.apache.org/jira/browse/KAFKA-17204
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Affects Versions: 3.9.0
>Reporter: Greg Harris
>Assignee: TengYao Chi
>Priority: Minor
>  Labels: newbie
> Fix For: 3.9.0
>
>
> The before method creates an AdminClient, but this client is never closed. It 
> should be closed either in `after` or `closeCluster`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17206) Use v1 of LeaderChangeMessage when kraft.version is 1

2024-07-26 Thread Alyssa Huang (Jira)
Alyssa Huang created KAFKA-17206:


 Summary: Use v1 of LeaderChangeMessage when kraft.version is 1
 Key: KAFKA-17206
 URL: https://issues.apache.org/jira/browse/KAFKA-17206
 Project: Kafka
  Issue Type: Task
Reporter: Alyssa Huang


[https://github.com/apache/kafka/pull/16668] introduced v1 of LCM but still 
uses v0 of the schema.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17203) StreamThread leaking producer instances

2024-07-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-17203:
---

Assignee: PoAn Yang

> StreamThread leaking producer instances
> ---
>
> Key: KAFKA-17203
> URL: https://issues.apache.org/jira/browse/KAFKA-17203
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Affects Versions: 3.9.0
>Reporter: Greg Harris
>Assignee: PoAn Yang
>Priority: Minor
>  Labels: newbie
>
> When running 
> EosIntegrationTest.shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled
>  leaks streams producers with the KAFKA-15845 leak testing extension, I 
> observed that this test appears to consistently leak StreamsProducers. The 
> producer is instantiated here:
> {noformat}
> This test contains a resource leak. Close the resources, or open a KAFKA 
> ticket and annotate this class with 
> @LeakTestingExtension.IgnoreAll("KAFKA-XYZ")
> org.opentest4j.AssertionFailedError: This test contains a resource leak. 
> Close the resources, or open a KAFKA ticket and annotate this class with 
> @LeakTestingExtension.IgnoreAll("KAFKA-XYZ")
>     at 
> org.apache.kafka.common.network.LeakTestingExtension.after(LeakTestingExtension.java:98)
>     at 
> org.apache.kafka.common.network.LeakTestingExtension$All.afterAll(LeakTestingExtension.java:123)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
> Caused by: org.opentest4j.AssertionFailedError: Leak check failed
>     at 
> org.apache.kafka.common.utils.LeakTester.lambda$combine$0(LeakTester.java:89)
>     at 
> org.apache.kafka.common.network.LeakTestingExtension.after(LeakTestingExtension.java:96)
>     ... 2 more
>     Suppressed: org.opentest4j.AssertionFailedError: AbstractSelector 
> instances left open
>         at 
> org.apache.kafka.common.utils.PredicateLeakTester.lambda$start$0(PredicateLeakTester.java:94)
>         at 
> org.apache.kafka.common.utils.LeakTester.lambda$combine$0(LeakTester.java:86)
>         ... 3 more
>         Suppressed: java.lang.Exception: Opened sun.nio.ch.KQueueSelectorImpl
>             at 
> org.apache.kafka.common.utils.PredicateLeakTester.open(PredicateLeakTester.java:63)
>             at 
> org.apache.kafka.common.network.NetworkContextLeakTester$RecordingSelectorProvider.openSelector(NetworkContextLeakTester.java:135)
>             at 
> org.apache.kafka.common.network.TestNetworkContext$SelectorProviderDecorator.openSelector(TestNetworkContext.java:166)
>             at 
> org.apache.kafka.common.network.Selector.(Selector.java:160)
>             at 
> org.apache.kafka.common.network.Selector.(Selector.java:213)
>             at 
> org.apache.kafka.common.network.Selector.(Selector.java:225)
>             at 
> org.apache.kafka.common.network.Selector.(Selector.java:229)
>             at 
> org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:225)
>             at 
> org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:163)
>             at 
> org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:526)
>             at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:465)
>             at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:297)
>             at 
> org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getProducer(DefaultKafkaClientSupplier.java:39)
>             at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.(StreamsProducer.java:142)
>             at 
> org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createRecordCollector(ActiveTaskCreator.java:196)
>             at 
> org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createActiveTask(ActiveTaskCreator.java:265)
>             at 
> org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createTasks(ActiveTaskCreator.java:176)
>             at 
> org.apache.kafka.streams.processor.internals.TaskManager.createNewTasks(TaskManager.java:441)
>             at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:390)
>             at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1559)
>             at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(Consumer

[jira] [Commented] (KAFKA-17203) StreamThread leaking producer instances

2024-07-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868995#comment-17868995
 ] 

Matthias J. Sax commented on KAFKA-17203:
-

Thanks for your interest! Go for it!

> StreamThread leaking producer instances
> ---
>
> Key: KAFKA-17203
> URL: https://issues.apache.org/jira/browse/KAFKA-17203
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Affects Versions: 3.9.0
>Reporter: Greg Harris
>Assignee: PoAn Yang
>Priority: Minor
>  Labels: newbie
>
> When running 
> EosIntegrationTest.shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled
>  leaks streams producers with the KAFKA-15845 leak testing extension, I 
> observed that this test appears to consistently leak StreamsProducers. The 
> producer is instantiated here:
> {noformat}
> This test contains a resource leak. Close the resources, or open a KAFKA 
> ticket and annotate this class with 
> @LeakTestingExtension.IgnoreAll("KAFKA-XYZ")
> org.opentest4j.AssertionFailedError: This test contains a resource leak. 
> Close the resources, or open a KAFKA ticket and annotate this class with 
> @LeakTestingExtension.IgnoreAll("KAFKA-XYZ")
>     at 
> org.apache.kafka.common.network.LeakTestingExtension.after(LeakTestingExtension.java:98)
>     at 
> org.apache.kafka.common.network.LeakTestingExtension$All.afterAll(LeakTestingExtension.java:123)
>     at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
> Caused by: org.opentest4j.AssertionFailedError: Leak check failed
>     at 
> org.apache.kafka.common.utils.LeakTester.lambda$combine$0(LeakTester.java:89)
>     at 
> org.apache.kafka.common.network.LeakTestingExtension.after(LeakTestingExtension.java:96)
>     ... 2 more
>     Suppressed: org.opentest4j.AssertionFailedError: AbstractSelector 
> instances left open
>         at 
> org.apache.kafka.common.utils.PredicateLeakTester.lambda$start$0(PredicateLeakTester.java:94)
>         at 
> org.apache.kafka.common.utils.LeakTester.lambda$combine$0(LeakTester.java:86)
>         ... 3 more
>         Suppressed: java.lang.Exception: Opened sun.nio.ch.KQueueSelectorImpl
>             at 
> org.apache.kafka.common.utils.PredicateLeakTester.open(PredicateLeakTester.java:63)
>             at 
> org.apache.kafka.common.network.NetworkContextLeakTester$RecordingSelectorProvider.openSelector(NetworkContextLeakTester.java:135)
>             at 
> org.apache.kafka.common.network.TestNetworkContext$SelectorProviderDecorator.openSelector(TestNetworkContext.java:166)
>             at 
> org.apache.kafka.common.network.Selector.(Selector.java:160)
>             at 
> org.apache.kafka.common.network.Selector.(Selector.java:213)
>             at 
> org.apache.kafka.common.network.Selector.(Selector.java:225)
>             at 
> org.apache.kafka.common.network.Selector.(Selector.java:229)
>             at 
> org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:225)
>             at 
> org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:163)
>             at 
> org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:526)
>             at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:465)
>             at 
> org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:297)
>             at 
> org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getProducer(DefaultKafkaClientSupplier.java:39)
>             at 
> org.apache.kafka.streams.processor.internals.StreamsProducer.(StreamsProducer.java:142)
>             at 
> org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createRecordCollector(ActiveTaskCreator.java:196)
>             at 
> org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createActiveTask(ActiveTaskCreator.java:265)
>             at 
> org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createTasks(ActiveTaskCreator.java:176)
>             at 
> org.apache.kafka.streams.processor.internals.TaskManager.createNewTasks(TaskManager.java:441)
>             at 
> org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:390)
>             at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1559)
>             at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(Consumer

[jira] [Assigned] (KAFKA-17202) EosIntegrationTest.verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset leaks consumers

2024-07-26 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-17202:
---

Assignee: TengYao Chi

> EosIntegrationTest.verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset 
> leaks consumers
> --
>
> Key: KAFKA-17202
> URL: https://issues.apache.org/jira/browse/KAFKA-17202
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Affects Versions: 3.9.0
>Reporter: Greg Harris
>Assignee: TengYao Chi
>Priority: Major
>  Labels: newbie
>
> This method creates a KafkaConsumer, but does not close it.
> We can use a try-with-resources to ensure the consumer is closed prior to 
> returning or throwing from this function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17202) EosIntegrationTest.verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset leaks consumers

2024-07-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868993#comment-17868993
 ] 

Matthias J. Sax commented on KAFKA-17202:
-

Thanks for your interest to pick it up. Go for it!

> EosIntegrationTest.verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset 
> leaks consumers
> --
>
> Key: KAFKA-17202
> URL: https://issues.apache.org/jira/browse/KAFKA-17202
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Affects Versions: 3.9.0
>Reporter: Greg Harris
>Assignee: TengYao Chi
>Priority: Major
>  Labels: newbie
>
> This method creates a KafkaConsumer, but does not close it.
> We can use a try-with-resources to ensure the consumer is closed prior to 
> returning or throwing from this function.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17101) Mirror maker internal topics cleanup policy changes to 'delete' from 'compact'

2024-07-26 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868990#comment-17868990
 ] 

Chris Egerton commented on KAFKA-17101:
---

Spoke too soon–looks like the test failures we're seeing are due to a bug in 
our testing logic, not Kafka (of course ). Still, it'd be useful to know more 
about your Kafka cluster in case the issue you're experiencing is due to a 
broker bug.

> Mirror maker internal topics cleanup policy changes to 'delete' from 
> 'compact' 
> ---
>
> Key: KAFKA-17101
> URL: https://issues.apache.org/jira/browse/KAFKA-17101
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.1, 3.5.1, 3.6.1
>Reporter: kaushik srinivas
>Priority: Major
>
> Scenario/Setup details
> Kafka cluster 1: 3 replicas
> Kafka cluster 2: 3 replicas
> MM1 moving data from cluster 1 to cluster 2
> MM2 moving data from cluster 2 to cluster 1
> Sometimes with a reboot of the kafka cluster 1 and MM1 instance, we observe 
> MM failing to come up with below exception,
> {code:java}
> {"message":"DistributedHerder-connect-1-1 - 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder - [Worker 
> clientId=connect-1, groupId=site1-mm2] Uncaught exception in herder work 
> thread, exiting: "}}
> org.apache.kafka.common.config.ConfigException: Topic 
> 'mm2-offsets.site1.internal' supplied via the 'offset.storage.topic' property 
> is required to have 'cleanup.policy=compact' to guarantee consistency and 
> durability of source connector offsets, but found the topic currently has 
> 'cleanup.policy=delete'. Continuing would likely result in eventually losing 
> source connector offsets and problems restarting this Connect cluster in the 
> future. Change the 'offset.storage.topic' property in the Connect worker 
> configurations to use a topic with 'cleanup.policy=compact'. {code}
> Once the topic is altered with cleanup policy of compact. MM works just fine.
> This is happening on our setups sporadically and across varieties of 
> scenarios. Not been successful in identifying the exact reproduction steps as 
> of now.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17186) Cannot receive message after stopping Source Mirror Maker 2

2024-07-26 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868989#comment-17868989
 ] 

Greg Harris commented on KAFKA-17186:
-

I think you want `.scheduled.rebalance.max.delay.ms`, without the 
"distributed".

> Cannot receive message after stopping Source Mirror Maker 2
> ---
>
> Key: KAFKA-17186
> URL: https://issues.apache.org/jira/browse/KAFKA-17186
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.1
> Environment: Source Kafka Cluster per Node:
> CPU(s): 32
> Memory: 32G/1.1G free
> Target Kafka Cluster standalone Node:
> CPU(s): 24
> Memory: 30G/909M free
> Kafka Version 3.7
> Mirrormaker Version 3.7.1
>Reporter: George Yang
>Priority: Major
> Attachments: image-2024-07-25-14-14-21-327.png, mirrorMaker.out
>
>
> Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node 
> 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. 
> Currently, a service on node 1 in Data Center A acts as a producer sending 
> messages to the `myTest` topic. A service in Data Center B acts as a consumer 
> listening to `A.myTest`. 
> The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer 
> in Data Center B ceases to receive messages. Even after I restarting MM2 in 
> Data Center A, the consumer in Data Center B still does not receive messages 
> until approximately 5 minutes later when a rebalance occurs, at which point 
> it begins receiving messages again.
>  
> [Logs From Consumer on Data Center B]
> [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
> [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
> (Re-)joining group 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
> [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Successfully joined group with generation Generation\{generationId=52, 
> memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Successfully synced group in generation Generation\{generationId=52, 
> memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined 
> group at generation 52 with protocol version 2 and got assignment: 
> Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', 
> leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], 
> taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, 
> MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], 
> delay=0} with rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting 
> connectors and tasks using config offset 1360 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1921)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Finished 
> starting connectors and tasks 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1950)
> [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] 
> Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
> [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] 
> (Re-)joining group 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
> [2024-07-23 17:29:26,890] INFO [Worker clientId=A->B, groupId=A-mm2] 
> Successfully joined group with generati

[jira] [Commented] (KAFKA-17112) StreamThread shutdown calls completeShutdown only in CREATED state

2024-07-26 Thread Ao Li (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868965#comment-17868965
 ] 

Ao Li commented on KAFKA-17112:
---

Hi [~cadonna], I've submitted a patch. Could you please take a look at it when 
you have time?

> StreamThread shutdown calls completeShutdown only in CREATED state
> --
>
> Key: KAFKA-17112
> URL: https://issues.apache.org/jira/browse/KAFKA-17112
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 3.9.0
>Reporter: Ao Li
>Priority: Minor
>
> While running tests in `StreamThreadTest.java` in kafka/streams, I noticed 
> the test left many lingering threads. Though the class runs `shutdown` after 
> each test, the shutdown only executes `completeShutdown` if the StreamThread 
> is in CREATED state. See 
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L231]
>  and 
> [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1435]
>  
> For example, you may run test 
> org.apache.kafka.streams.processor.internals.StreamThreadTest#shouldNotCloseTaskProducerWhenSuspending
>  with commit 0b11971f2c94f7aadc3fab2c51d94642065a72e5. When the test calls 
> `thread.shutdown()`, the thread is in `PARTITIONS_REVOKED` state. Thus, 
> `completeShutdown` is not called. The test creates three lingering threads: 2 
> `StateUpdater` and 1 `TaskExecutor`
>  
> This means that calls to `thread.shutdown` has no effect in 
> `StreamThreadTest.java`. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17033) Consider replacing the directory id Optional with just Uuid

2024-07-26 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868940#comment-17868940
 ] 

TengYao Chi commented on KAFKA-17033:
-

Hi [~jsancio] 

Does this ticket is talking about the field `directoryId` of `MetaProperties` 
and `ReplicaKey` ?

> Consider replacing the directory id Optional with just Uuid
> -
>
> Key: KAFKA-17033
> URL: https://issues.apache.org/jira/browse/KAFKA-17033
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: José Armando García Sancio
>Priority: Major
>
> One way to handle this is to introduce a type called `DirectoryId` that just 
> encapsulates a Uuid but it is able to better handle the Uuid.ZERO_UUID case.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-17186) Cannot receive message after stopping Source Mirror Maker 2

2024-07-26 Thread George Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868842#comment-17868842
 ] 

George Yang edited comment on KAFKA-17186 at 7/26/24 11:21 AM:
---

Thanks [~gharris1727] again.

Sorry for the confusion, "affect the old Kafka cluster"  just means the two 
Kafka clusters are located in different data centers, please ignore the word 
‘old’.

To test the rebalance delay, I attempted to set the parameter 
{{scheduled.rebalance.max.delay.ms=2}} in 
{{{}connect-mirror-maker.properties{}}}. However, the log file 
{{mirrormaker.out}} continued to show the old value of 
{{scheduled.rebalance.max.delay.ms = 30}} under the DistributedConfig 
section. Subsequently, I tried setting it as follows:

A. {{distributed.scheduled.rebalance.max.delay.ms=2}}

B. {{distributed.scheduled.rebalance.max.delay.ms=2}}

Nevertheless, {{mirrormaker.out}} still indicates the default value of 30.

How should I configure this parameter to make it effective?

 


was (Author: JIRAUSER302264):
Thanks [~gharris1727] again.

Sorry for the confusion, "affect the old Kafka cluster" is just mean the two 
Kafka clusters locate in each data center, please ignore the word old.

To test the rebalance delay, I attempted to set the parameter 
{{scheduled.rebalance.max.delay.ms=2}} in 
{{{}connect-mirror-maker.properties{}}}. However, the log file 
{{mirrormaker.out}} continued to show the old value of 
{{scheduled.rebalance.max.delay.ms = 30}} under the DistributedConfig 
section. Subsequently, I tried setting it as follows:

A. {{distributed.scheduled.rebalance.max.delay.ms=2}}

B. {{distributed.scheduled.rebalance.max.delay.ms=2}}

Nevertheless, {{mirrormaker.out}} still indicates the default value of 30.

How should I configure this parameter to make it effective?

 

> Cannot receive message after stopping Source Mirror Maker 2
> ---
>
> Key: KAFKA-17186
> URL: https://issues.apache.org/jira/browse/KAFKA-17186
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.7.1
> Environment: Source Kafka Cluster per Node:
> CPU(s): 32
> Memory: 32G/1.1G free
> Target Kafka Cluster standalone Node:
> CPU(s): 24
> Memory: 30G/909M free
> Kafka Version 3.7
> Mirrormaker Version 3.7.1
>Reporter: George Yang
>Priority: Major
> Attachments: image-2024-07-25-14-14-21-327.png, mirrorMaker.out
>
>
> Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node 
> 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. 
> Currently, a service on node 1 in Data Center A acts as a producer sending 
> messages to the `myTest` topic. A service in Data Center B acts as a consumer 
> listening to `A.myTest`. 
> The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer 
> in Data Center B ceases to receive messages. Even after I restarting MM2 in 
> Data Center A, the consumer in Data Center B still does not receive messages 
> until approximately 5 minutes later when a rebalance occurs, at which point 
> it begins receiving messages again.
>  
> [Logs From Consumer on Data Center B]
> [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing 
> consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95)
> [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Rebalance started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242)
> [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] 
> (Re-)joining group 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604)
> [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Successfully joined group with generation Generation\{generationId=52, 
> memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665)
> [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] 
> Successfully synced group in generation Generation\{generationId=52, 
>

[jira] [Commented] (KAFKA-17201) SelectorTest.testInboundConnectionsCountInConnectionCreationMetric leaks sockets and threads

2024-07-26 Thread TengYao Chi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868902#comment-17868902
 ] 

TengYao Chi commented on KAFKA-17201:
-

Hello [~gharris1727] 

May I have this ticket ?

Many thanks :)

> SelectorTest.testInboundConnectionsCountInConnectionCreationMetric leaks 
> sockets and threads
> 
>
> Key: KAFKA-17201
> URL: https://issues.apache.org/jira/browse/KAFKA-17201
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.9.0
>Reporter: Greg Harris
>Priority: Minor
>  Labels: flaky-test, newbie
>
> This test creates multiple Threads which in turn open sockets. It is possible 
> that these threads and sockets outlive the test itself, as the threads are 
> not joined before the test completes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17205) Allow topic config validation in controller level in KRaft mode

2024-07-26 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-17205:
--
Summary: Allow topic config validation in controller level in KRaft mode  
(was: Allow topic config validation in controller level )

> Allow topic config validation in controller level in KRaft mode
> ---
>
> Key: KAFKA-17205
> URL: https://issues.apache.org/jira/browse/KAFKA-17205
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Allow topic config validation in controller level. This is required because 
> we need to fail the invalid config change before it is written into metadata 
> log, especially for tiered storage feature.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17205) Allow topic config validation in controller level in KRaft mode

2024-07-26 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-17205:
--
Description: 
Allow topic config validation in controller level. This is required because we 
need to fail the invalid config change before it is written into metadata log, 
especially for tiered storage feature.

 

Note: This ticket only changes for KRaft mode.

  was:Allow topic config validation in controller level. This is required 
because we need to fail the invalid config change before it is written into 
metadata log, especially for tiered storage feature.


> Allow topic config validation in controller level in KRaft mode
> ---
>
> Key: KAFKA-17205
> URL: https://issues.apache.org/jira/browse/KAFKA-17205
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Allow topic config validation in controller level. This is required because 
> we need to fail the invalid config change before it is written into metadata 
> log, especially for tiered storage feature.
>  
> Note: This ticket only changes for KRaft mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-8115) Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated

2024-07-26 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868895#comment-17868895
 ] 

Sagar Rao edited comment on KAFKA-8115 at 7/26/24 8:52 AM:
---

Failed again in 
[this|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-16628/5/#showFailuresLink]
 build. 

 

 
{code:java}
java.util.concurrent.TimeoutException: 
testTaskRequestWithOldStartMsGetsUpdated() timed out after 240 seconds at 
java.base/java.util.ArrayList.forEach(ArrayList.java:1596) at 
java.base/java.util.ArrayList.forEach(ArrayList.java:1596) Suppressed: 
java.lang.InterruptedException at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1765)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
 at 
java.base/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:780)
 at 
org.apache.kafka.trogdor.coordinator.TaskManager.waitForShutdown(TaskManager.java:686)
 at 
org.apache.kafka.trogdor.coordinator.Coordinator.waitForShutdown(Coordinator.java:131)
 at 
org.apache.kafka.trogdor.common.MiniTrogdorCluster.close(MiniTrogdorCluster.java:288)
 at 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated(CoordinatorTest.java:594)
 at java.base/java.lang.reflect.Method.invoke(Method.java:580) ... 2 more
 
{code}
 


was (Author: sagarrao):
Failed again in 
[this|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-16628/5/#showFailuresLink]
 build. 

```

java.util.concurrent.TimeoutException: 
testTaskRequestWithOldStartMsGetsUpdated() timed out after 240 seconds at 
java.base/java.util.ArrayList.forEach(ArrayList.java:1596) at 
java.base/java.util.ArrayList.forEach(ArrayList.java:1596) Suppressed: 
java.lang.InterruptedException at 
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1765)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475)
 at 
java.base/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:780)
 at 
org.apache.kafka.trogdor.coordinator.TaskManager.waitForShutdown(TaskManager.java:686)
 at 
org.apache.kafka.trogdor.coordinator.Coordinator.waitForShutdown(Coordinator.java:131)
 at 
org.apache.kafka.trogdor.common.MiniTrogdorCluster.close(MiniTrogdorCluster.java:288)
 at 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated(CoordinatorTest.java:594)
 at java.base/java.lang.reflect.Method.invoke(Method.java:580) ... 2 more

```

> Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated
> ---
>
> Key: KAFKA-8115
> URL: https://issues.apache.org/jira/browse/KAFKA-8115
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Greg Harris
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3254/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/testTaskRequestWithOldStartMsGetsUpdated/]
> {quote}org.junit.runners.model.TestTimedOutException: test timed out after 
> 12 milliseconds at java.base@11.0.1/jdk.internal.misc.Unsafe.park(Native 
> Method) at 
> java.base@11.0.1/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
>  at 
> java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
>  at 
> java.base@11.0.1/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454)
>  at 
> java.base@11.0.1/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709)
>  at 
> app//org.apache.kafka.trogdor.rest.JsonRestServer.waitForShutdown(JsonRestServer.java:157)
>  at app//org.apache.kafka.trogdor.agent.Agent.waitForShutdown(Agent.java:123) 
> at 
> app//org.apache.kafka.trogdor.common.MiniTrogdorCluster.close(MiniTrogdorCluster.java:285)
>  at 
> app//org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated(CoordinatorTest.java:596)
>  at 
> java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base@11.0.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@11.0.1/ja

  1   2   3   4   5   6   7   8   9   10   >