[jira] [Resolved] (KAFKA-17186) Cannot receive message after stopping Source Mirror Maker 2
[ https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] George Yang resolved KAFKA-17186. - Resolution: Not A Problem configuration issues, fixed > Cannot receive message after stopping Source Mirror Maker 2 > --- > > Key: KAFKA-17186 > URL: https://issues.apache.org/jira/browse/KAFKA-17186 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.1 > Environment: Source Kafka Cluster per Node: > CPU(s): 32 > Memory: 32G/1.1G free > Target Kafka Cluster standalone Node: > CPU(s): 24 > Memory: 30G/909M free > Kafka Version 3.7 > Mirrormaker Version 3.7.1 >Reporter: George Yang >Priority: Major > Attachments: image-2024-07-25-14-14-21-327.png, mirrorMaker.out > > > Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node > 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. > Currently, a service on node 1 in Data Center A acts as a producer sending > messages to the `myTest` topic. A service in Data Center B acts as a consumer > listening to `A.myTest`. > The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer > in Data Center B ceases to receive messages. Even after I restarting MM2 in > Data Center A, the consumer in Data Center B still does not receive messages > until approximately 5 minutes later when a rebalance occurs, at which point > it begins receiving messages again. > > [Logs From Consumer on Data Center B] > [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > (Re-)joining group > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) > [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully joined group with generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully synced group in generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined > group at generation 52 with protocol version 2 and got assignment: > Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', > leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], > taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, > MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], > delay=0} with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting > connectors and tasks using config offset 1360 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1921) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Finished > starting connectors and tasks > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1950) > [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] > Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) > [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] > (Re-)joining group > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) > [2024-07-23 17:29:26,890] INFO [Worker clientId=A->B, groupId=A-mm2] > Successfully joined group with generation Generation\{generationId=143, > memberId='A->B-0d04e6c1-f12a-4
[jira] [Commented] (KAFKA-17186) Cannot receive message after stopping Source Mirror Maker 2
[ https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869474#comment-17869474 ] George Yang commented on KAFKA-17186: - OK,[~gharris1727] Thank you for your explanation and research. > Cannot receive message after stopping Source Mirror Maker 2 > --- > > Key: KAFKA-17186 > URL: https://issues.apache.org/jira/browse/KAFKA-17186 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.1 > Environment: Source Kafka Cluster per Node: > CPU(s): 32 > Memory: 32G/1.1G free > Target Kafka Cluster standalone Node: > CPU(s): 24 > Memory: 30G/909M free > Kafka Version 3.7 > Mirrormaker Version 3.7.1 >Reporter: George Yang >Priority: Major > Attachments: image-2024-07-25-14-14-21-327.png, mirrorMaker.out > > > Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node > 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. > Currently, a service on node 1 in Data Center A acts as a producer sending > messages to the `myTest` topic. A service in Data Center B acts as a consumer > listening to `A.myTest`. > The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer > in Data Center B ceases to receive messages. Even after I restarting MM2 in > Data Center A, the consumer in Data Center B still does not receive messages > until approximately 5 minutes later when a rebalance occurs, at which point > it begins receiving messages again. > > [Logs From Consumer on Data Center B] > [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > (Re-)joining group > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) > [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully joined group with generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully synced group in generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined > group at generation 52 with protocol version 2 and got assignment: > Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', > leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], > taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, > MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], > delay=0} with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting > connectors and tasks using config offset 1360 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1921) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Finished > starting connectors and tasks > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1950) > [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] > Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) > [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] > (Re-)joining group > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) > [2024-07-23 17:29:26,890] INFO [Worker clientId=A->B, groupId=A-mm2] > Successfully joined group with generation Generation\{generatio
[jira] [Updated] (KAFKA-17215) Remove get-prefix for all getters
[ https://issues.apache.org/jira/browse/KAFKA-17215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-17215: Description: Kafka traditionally does not use a `get` prefix for getter methods. However, for multiple public interfaces, we don't follow this common pattern, but actually have a get-prefix. We might want to clean this up. The upcoming 4.0 release might be a good opportunity to deprecate existing methods and add them back with the "correct" name. We should maybe also do multiple smaller KIPs instead of just one big KIP. We do know of the following * StreamsConfig (getMainConsumerConfigs, getRestoreConsumerConfigs, getGlobalConsumerConfigs, getProducerConfigs, getAdminConfigs, getClientTags, getKafkaClientSupplier – for some of these, we might even consider to remove them; it's questionable if it makes sense to have them in the public API (cf [https://github.com/apache/kafka/pull/14548)] – we should also consider https://issues.apache.org/jira/browse/KAFKA-16945 for this work) * TopologyConfig (getTaskConfig) * KafkaClientSupplier (getAdmin, getProducer, getConsumer, getRestoreConsumer, getGlobalConsumer) * Contexts (maybe not worth it... we might deprecate the whole class soon): ** ProcessorContext (getStateStore) ** MockProcessorContext (getStateStore) ** MockProcessorContext#{color:#172b4d}CapturedPunctuator (getIntervalMs, getType, getPunctuator),{color} * api.ProcessingContext (getStateStore) ** api.FixedKeyProcessorContext (getStateStore) ** api.MockProcessorContext (getStateStore, getStateStoreContext) ** api.MockProcessorContext#{color:#172b4d}CapturedPunctuator (getInterval, getType, getPunctuator){color} * StateStore (getPosition) * IQv2: officially an evolving API (maybe we can rename in 4.0 directly w/o deprecation period, but might be nasty...) ** KeyQuery (getKey) ** Position (getTopics, getPartitionPositions) ** QueryResult (getExecutionInfo, getPosition, getFailureReason, getFailureMessage, getResult) ** RangeQuery (getLowerBound, getUpperBound) ** StateQueryRequest (getStoreName, getPositionBound, getQuery, getPartitions) ** StateQueryResult (getPartitionResults, getOnlyPartitionResult, getGlobalResult, getPosition) ** WindowKeyQuery (getKey, getTimeFrom, getTimeTo, ** WindowRangeQuery (getKey, getTimeFrom, getTimeTo) * TopologyTestDriver (getAllStateStores, getStateStore, getKeyValueStore, getTimestampedKeyValueStore, getVersionedKeyValueStore, getWindowStore, getTimestampedWindowStore, getSessionStore) * TestOutputTopic (getQueueSize) was: Kafka traditionally does not use a `get` prefix for getter methods. However, for multiple public interfaces, we don't follow this common pattern, but actually have a get-prefix. We might want to clean this up. The upcoming 4.0 release might be a good opportunity to deprecate existing methods and add them back with the "correct" name. We should maybe also do multiple smaller KIPs instead of just one big KIP. We do know of the following * StreamsConfig (getMainConsumerConfigs, getRestoreConsumerConfigs, getGlobalConsumerConfigs, getProducerConfigs, getAdminConfigs, getClientTags, getKafkaClientSupplier – for some of these, we might even consider to remove them; it's questionable if it makes sense to have them in the public API (cf [https://github.com/apache/kafka/pull/14548)] – we should also consider https://issues.apache.org/jira/browse/KAFKA-16945 for this work) * TopologyConfig (getTaskConfig) * KafkaClientSupplier (getAdmin, getProducer, getConsumer, getRestoreConsumer, getGlobalConsumer) * Contexts (maybe not worth it... we might deprecate the whole class soon): ** ProcessorContext (getStateStore) ** MockProcessorContext (getStateStore) ** MockProcessorContext#{color:#172b4d}CapturedPunctuator (getIntervalMs, getType, getPunctuator),{color} * api.ProcessingContext (getStateStore) ** api.FixedKeyProcessorContext (getStateStore) ** api.MockProcessorContext (getStateStore) ** api.MockProcessorContext#{color:#172b4d}CapturedPunctuator (getInterval, getType, getPunctuator){color} * StateStore (getPosition) * IQv2: officially an evolving API (maybe we can rename in 4.0 directly w/o deprecation period, but might be nasty...) ** KeyQuery (getKey) ** Position (getTopics, getPartitionPositions) ** QueryResult (getExecutionInfo, getPosition, getFailureReason, getFailureMessage, getResult) ** RangeQuery (getLowerBound, getUpperBound) ** StateQueryRequest (getStoreName, getPositionBound, getQuery, getPartitions) ** StateQueryResult (getPartitionResults, getOnlyPartitionResult, getGlobalResult, getPosition) ** WindowKeyQuery (getKey, getTimeFrom, getTimeTo, ** WindowRangeQuery (getKey, getTimeFrom, getTimeTo) * TopologyTestDriver (getAllStateStores, getStateStore, getKeyValueStore, getTimestampedKeyValueStore, getVersionedKeyValueS
[jira] [Updated] (KAFKA-17215) Remove get-prefix for all getters
[ https://issues.apache.org/jira/browse/KAFKA-17215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-17215: Description: Kafka traditionally does not use a `get` prefix for getter methods. However, for multiple public interfaces, we don't follow this common pattern, but actually have a get-prefix. We might want to clean this up. The upcoming 4.0 release might be a good opportunity to deprecate existing methods and add them back with the "correct" name. We should maybe also do multiple smaller KIPs instead of just one big KIP. We do know of the following * StreamsConfig (getMainConsumerConfigs, getRestoreConsumerConfigs, getGlobalConsumerConfigs, getProducerConfigs, getAdminConfigs, getClientTags, getKafkaClientSupplier – for some of these, we might even consider to remove them; it's questionable if it makes sense to have them in the public API (cf [https://github.com/apache/kafka/pull/14548)] – we should also consider https://issues.apache.org/jira/browse/KAFKA-16945 for this work) * TopologyConfig (getTaskConfig) * KafkaClientSupplier (getAdmin, getProducer, getConsumer, getRestoreConsumer, getGlobalConsumer) * Contexts (maybe not worth it... we might deprecate the whole class soon): ** ProcessorContext (getStateStore) ** MockProcessorContext (getStateStore) ** MockProcessorContext#{color:#172b4d}CapturedPunctuator (getIntervalMs, getType, getPunctuator),{color} * api.ProcessingContext (getStateStore) ** api.FixedKeyProcessorContext (getStateStore) ** api.MockProcessorContext (getStateStore) ** api.MockProcessorContext#{color:#172b4d}CapturedPunctuator (getInterval, getType, getPunctuator){color} * StateStore (getPosition) * IQv2: officially an evolving API (maybe we can rename in 4.0 directly w/o deprecation period, but might be nasty...) ** KeyQuery (getKey) ** Position (getTopics, getPartitionPositions) ** QueryResult (getExecutionInfo, getPosition, getFailureReason, getFailureMessage, getResult) ** RangeQuery (getLowerBound, getUpperBound) ** StateQueryRequest (getStoreName, getPositionBound, getQuery, getPartitions) ** StateQueryResult (getPartitionResults, getOnlyPartitionResult, getGlobalResult, getPosition) ** WindowKeyQuery (getKey, getTimeFrom, getTimeTo, ** WindowRangeQuery (getKey, getTimeFrom, getTimeTo) * TopologyTestDriver (getAllStateStores, getStateStore, getKeyValueStore, getTimestampedKeyValueStore, getVersionedKeyValueStore, getWindowStore, getTimestampedWindowStore, getSessionStore) * TestOutputTopic (getQueueSize) was: Kafka traditionally does not use a `get` prefix for getter methods. However, for multiple public interfaces, we don't follow this common pattern, but actually have a get-prefix. We might want to clean this up. The upcoming 4.0 release might be a good opportunity to deprecate existing methods and add them back with the "correct" name. We should maybe also do multiple smaller KIPs instead of just one big KIP. We do know of the following * StreamsConfig (getMainConsumerConfigs, getRestoreConsumerConfigs, getGlobalConsumerConfigs, getProducerConfigs, getAdminConfigs, getClientTags, getKafkaClientSupplier – for some of these, we might even consider to remove them; it's questionable if it makes sense to have them in the public API (cf [https://github.com/apache/kafka/pull/14548)] – we should also consider https://issues.apache.org/jira/browse/KAFKA-16945 for this work) * TopologyConfig (getTaskConfig) * KafkaClientSupplier (getAdmin, getProducer, getConsumer, getRestoreConsumer, getGlobalConsumer) * Contexts (maybe not worth it... we might deprecate the whole class soon): ** ProcessorContext (getStateStore) ** MockProcessorContext (getStateStore) * api.ProcessingContext (getStateStore) ** api.FixedKeyProcessorContext (getStateStore) ** api.MockProcessorContext (getStateStore) * StateStore (getPosition) * IQv2: officially an evolving API (maybe we can rename in 4.0 directly w/o deprecation period, but might be nasty...) ** KeyQuery (getKey) ** Position (getTopics, getPartitionPositions) ** QueryResult (getExecutionInfo, getPosition, getFailureReason, getFailureMessage, getResult) ** RangeQuery (getLowerBound, getUpperBound) ** StateQueryRequest (getStoreName, getPositionBound, getQuery, getPartitions) ** StateQueryResult (getPartitionResults, getOnlyPartitionResult, getGlobalResult, getPosition) ** WindowKeyQuery (getKey, getTimeFrom, getTimeTo, ** WindowRangeQuery (getKey, getTimeFrom, getTimeTo) * TopologyTestDriver (getAllStateStores, getStateStore, getKeyValueStore, getTimestampedKeyValueStore, getVersionedKeyValueStore, getWindowStore, getTimestampedWindowStore, getSessionStore) * TestOutputTopic (getQueueSize) > Remove get-prefix for all getters > - > > Key: KAFKA-17215 >
[jira] [Commented] (KAFKA-17186) Cannot receive message after stopping Source Mirror Maker 2
[ https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869468#comment-17869468 ] Greg Harris commented on KAFKA-17186: - I don't think i've ever thought about it like that, but your system does make some sense. The reality is less perfect :) I looked at this implementation: [https://github.com/apache/kafka/blob/2cf87bff9b6b5136a22539edf48b2d7cc668bdf9/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L180] which forwards all .* configurations to the worker configuration, which is the input to the DistributedConfig. It just happens to not include "distributed" in the prefix. The producer/consumer/admin pattern is something that the Worker/WorkerConfig specifies: [https://github.com/apache/kafka/blob/0ec520a2af05ecd6956b5e645e257d72ad5b8fbd/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L833] and MirrorMaker2 doesn't follow the same pattern. > Cannot receive message after stopping Source Mirror Maker 2 > --- > > Key: KAFKA-17186 > URL: https://issues.apache.org/jira/browse/KAFKA-17186 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.1 > Environment: Source Kafka Cluster per Node: > CPU(s): 32 > Memory: 32G/1.1G free > Target Kafka Cluster standalone Node: > CPU(s): 24 > Memory: 30G/909M free > Kafka Version 3.7 > Mirrormaker Version 3.7.1 >Reporter: George Yang >Priority: Major > Attachments: image-2024-07-25-14-14-21-327.png, mirrorMaker.out > > > Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node > 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. > Currently, a service on node 1 in Data Center A acts as a producer sending > messages to the `myTest` topic. A service in Data Center B acts as a consumer > listening to `A.myTest`. > The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer > in Data Center B ceases to receive messages. Even after I restarting MM2 in > Data Center A, the consumer in Data Center B still does not receive messages > until approximately 5 minutes later when a rebalance occurs, at which point > it begins receiving messages again. > > [Logs From Consumer on Data Center B] > [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > (Re-)joining group > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) > [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully joined group with generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully synced group in generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined > group at generation 52 with protocol version 2 and got assignment: > Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', > leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], > taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, > MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], > delay=0} with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting > connectors and tasks using config offset 1360 > (org.apache.kafk
[jira] [Created] (KAFKA-17215) Remove get-prefix for all getters
Matthias J. Sax created KAFKA-17215: --- Summary: Remove get-prefix for all getters Key: KAFKA-17215 URL: https://issues.apache.org/jira/browse/KAFKA-17215 Project: Kafka Issue Type: Improvement Components: streams, streams-test-utils Reporter: Matthias J. Sax Kafka traditionally does not use a `get` prefix for getter methods. However, for multiple public interfaces, we don't follow this common pattern, but actually have a get-prefix. We might want to clean this up. The upcoming 4.0 release might be a good opportunity to deprecate existing methods and add them back with the "correct" name. We should maybe also do multiple smaller KIPs instead of just one big KIP. We do know of the following * StreamsConfig (getMainConsumerConfigs, getRestoreConsumerConfigs, getGlobalConsumerConfigs, getProducerConfigs, getAdminConfigs, getClientTags, getKafkaClientSupplier – for some of these, we might even consider to remove them; it's questionable if it makes sense to have them in the public API (cf [https://github.com/apache/kafka/pull/14548)] – we should also consider https://issues.apache.org/jira/browse/KAFKA-16945 for this work) * TopologyConfig (getTaskConfig) * KafkaClientSupplier (getAdmin, getProducer, getConsumer, getRestoreConsumer, getGlobalConsumer) * Contexts (maybe not worth it... we might deprecate the whole class soon): ** ProcessorContext (getStateStore) ** MockProcessorContext (getStateStore) * api.ProcessingContext (getStateStore) ** api.FixedKeyProcessorContext (getStateStore) ** api.MockProcessorContext (getStateStore) * StateStore (getPosition) * IQv2: officially an evolving API (maybe we can rename in 4.0 directly w/o deprecation period, but might be nasty...) ** KeyQuery (getKey) ** Position (getTopics, getPartitionPositions) ** QueryResult (getExecutionInfo, getPosition, getFailureReason, getFailureMessage, getResult) ** RangeQuery (getLowerBound, getUpperBound) ** StateQueryRequest (getStoreName, getPositionBound, getQuery, getPartitions) ** StateQueryResult (getPartitionResults, getOnlyPartitionResult, getGlobalResult, getPosition) ** WindowKeyQuery (getKey, getTimeFrom, getTimeTo, ** WindowRangeQuery (getKey, getTimeFrom, getTimeTo) * TopologyTestDriver (getAllStateStores, getStateStore, getKeyValueStore, getTimestampedKeyValueStore, getVersionedKeyValueStore, getWindowStore, getTimestampedWindowStore, getSessionStore) * TestOutputTopic (getQueueSize) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17211) kafka producer get stuck by Uncaught error in kafka producer I/O thread
[ https://issues.apache.org/jira/browse/KAFKA-17211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17211: -- Component/s: producer > kafka producer get stuck by Uncaught error in kafka producer I/O thread > --- > > Key: KAFKA-17211 > URL: https://issues.apache.org/jira/browse/KAFKA-17211 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 2.8.2 >Reporter: yapeng >Priority: Major > > I have a kafka producer and it reported this error message forever. > {code:java} > [Producer clientId=xxx] Uncaught error in kafka producer I/O > thread: > java.util.NoSuchElementException: null > at java.util.ArrayDeque.getFirst(ArrayDeque.java:329) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.expiredBatches(RecordAccumulator.java:306) > at > org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:372) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:326) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:255) > at java.lang.Thread.run(Thread.java:750){code} > It seems wired that there has null values in Deque. The > machine has been restarted to solve online issues so there is no heap dump. > Please help give some insights about this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17116) New consumer may not send effective leave group if member ID received after close
[ https://issues.apache.org/jira/browse/KAFKA-17116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869467#comment-17869467 ] Kirk True commented on KAFKA-17116: --- My understanding is: # This is an edge case # The broker contains logic to remove stale members If the above two statements are true, what's the downside of letting the broker handle the member as it does today, namely removing it after some configured amount of time? I agree that it makes sense to augment the client logic to only send the 'leave group' request if we have a member ID. At this point, I'm -1 on the idea of introducing temporary IDs to handle edge cases like this. > New consumer may not send effective leave group if member ID received after > close > -- > > Key: KAFKA-17116 > URL: https://issues.apache.org/jira/browse/KAFKA-17116 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Assignee: TengYao Chi >Priority: Major > Labels: kip-848-client-support > Fix For: 3.9.0 > > > If the new consumer is closed after sending a HB to join, but before > receiving the response to it, it will send a leave group request but without > member ID (will simply fail with UNKNOWN_MEMBER_ID). This will make that the > broker will have a registered new member, for which it will never receive a > leave request for it. > # consumer.subscribe -> sends HB to join, transitions to JOINING > # consumer.close -> will transition to LEAVING and send HB with epoch -1 > (without waiting for in-flight requests) > # consumer receives response to initial HB, containing the assigned member > ID. It will simply ignore it because it's not in the group anymore > (UNSUBSCRIBED) > Note that the expectation, with the current logic, and main downsides of this > are: > # If the case was that the member received partitions on the first HB, those > partitions won't be re-assigned (broker waiting for the closed consumer to > reconcile them), until the rebalance timeout expires. > # Even if no partitions were assigned to it, the member will remain in the > group from the broker point of view (but not from the client POV). The member > will be eventually kicked out for not sending HBs, but only when it's session > timeout expires. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16390) consume_bench_test.py failed using AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-16390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16390: -- Component/s: clients > consume_bench_test.py failed using AsyncKafkaConsumer > - > > Key: KAFKA-16390 > URL: https://issues.apache.org/jira/browse/KAFKA-16390 > Project: Kafka > Issue Type: Task > Components: clients, consumer, system tests >Reporter: Philip Nee >Assignee: TengYao Chi >Priority: Major > Labels: kip-848-client-support > > Ran the system test based on KAFKA-16273 > The following tests failed using the consumer group protocol > {code:java} > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_single_partition.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > {code} > Because of > {code:java} > TimeoutError('consume_workload failed to finish in the expected amount of > time.') > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", > line 146, in test_single_partition > consume_workload.wait_for_done(timeout_sec=180) > File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line > 352, in wait_for_done > wait_until(lambda: self.done(), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: consume_workload failed to finish in the > expected amount of time. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16390) consume_bench_test.py failed using AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-16390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869464#comment-17869464 ] Kirk True commented on KAFKA-16390: --- [~frankvicky] transferring the Jira to you and marking as Patch Available. Thanks! > consume_bench_test.py failed using AsyncKafkaConsumer > - > > Key: KAFKA-16390 > URL: https://issues.apache.org/jira/browse/KAFKA-16390 > Project: Kafka > Issue Type: Task > Components: consumer, system tests >Reporter: Philip Nee >Assignee: TengYao Chi >Priority: Major > Labels: kip-848-client-support > > Ran the system test based on KAFKA-16273 > The following tests failed using the consumer group protocol > {code:java} > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_single_partition.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > {code} > Because of > {code:java} > TimeoutError('consume_workload failed to finish in the expected amount of > time.') > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", > line 146, in test_single_partition > consume_workload.wait_for_done(timeout_sec=180) > File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line > 352, in wait_for_done > wait_until(lambda: self.done(), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: consume_workload failed to finish in the > expected amount of time. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16390) consume_bench_test.py failed using AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-16390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16390: - Assignee: TengYao Chi (was: Kirk True) > consume_bench_test.py failed using AsyncKafkaConsumer > - > > Key: KAFKA-16390 > URL: https://issues.apache.org/jira/browse/KAFKA-16390 > Project: Kafka > Issue Type: Task > Components: consumer, system tests >Reporter: Philip Nee >Assignee: TengYao Chi >Priority: Major > Labels: kip-848-client-support > > Ran the system test based on KAFKA-16273 > The following tests failed using the consumer group protocol > {code:java} > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_single_partition.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > {code} > Because of > {code:java} > TimeoutError('consume_workload failed to finish in the expected amount of > time.') > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", > line 146, in test_single_partition > consume_workload.wait_for_done(timeout_sec=180) > File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line > 352, in wait_for_done > wait_until(lambda: self.done(), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: consume_workload failed to finish in the > expected amount of time. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17209) Revisit testCurrentLag for AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-17209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17209: -- Labels: kip-848-client-support (was: ) > Revisit testCurrentLag for AsyncKafkaConsumer > - > > Key: KAFKA-17209 > URL: https://issues.apache.org/jira/browse/KAFKA-17209 > Project: Kafka > Issue Type: Test > Components: clients, unit tests >Reporter: TaiJuWu >Assignee: TaiJuWu >Priority: Major > Labels: kip-848-client-support > > The difference between {{ClassicConsumer}} and {{AsyncConsumer}} is > ClassicConsumer is single thread and AsyncConsumer is two thread, one is app > thread, the other is background thread. > {{Fetch}} Request is generated by background thread and {{ListOffset}} > Request is generated by application thread. > This will make {{testCurrentLag}} fail if AsyncConsumer generated {{Fetch > Request}} first. > So we support out of order feature for {{MockClient}} to resolve issue. > Another difference between asyncConsumer and ClassicConsumer is front do > generate additional {{FetchOffset}} Request from CommitRequestManager but > ClassicConsumer does not have the request. > The reason is coordinator is not ready during test for {{ClassicConsumer}} so > we can ignore this request for {{AsyncConsumer.}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17209) Revisit testCurrentLag for AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-17209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17209: -- Component/s: clients unit tests > Revisit testCurrentLag for AsyncKafkaConsumer > - > > Key: KAFKA-17209 > URL: https://issues.apache.org/jira/browse/KAFKA-17209 > Project: Kafka > Issue Type: Test > Components: clients, unit tests >Reporter: TaiJuWu >Assignee: TaiJuWu >Priority: Major > > The difference between {{ClassicConsumer}} and {{AsyncConsumer}} is > ClassicConsumer is single thread and AsyncConsumer is two thread, one is app > thread, the other is background thread. > {{Fetch}} Request is generated by background thread and {{ListOffset}} > Request is generated by application thread. > This will make {{testCurrentLag}} fail if AsyncConsumer generated {{Fetch > Request}} first. > So we support out of order feature for {{MockClient}} to resolve issue. > Another difference between asyncConsumer and ClassicConsumer is front do > generate additional {{FetchOffset}} Request from CommitRequestManager but > ClassicConsumer does not have the request. > The reason is coordinator is not ready during test for {{ClassicConsumer}} so > we can ignore this request for {{AsyncConsumer.}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16883) Zookeeper-Kraft failing migration - RPC got timed out before it could be sent
[ https://issues.apache.org/jira/browse/KAFKA-16883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869447#comment-17869447 ] David Arthur commented on KAFKA-16883: -- Thanks for confirming your success hre [~nicolas.henneaux]. Looking at the log between 3.7.0 and 3.7.1, there were a handful of fixes related to metadata/controller and one fix specific to migrations (KAFKA-16563) > Zookeeper-Kraft failing migration - RPC got timed out before it could be sent > - > > Key: KAFKA-16883 > URL: https://issues.apache.org/jira/browse/KAFKA-16883 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.7.0, 3.6.1, 3.6.2 >Reporter: Nicolas Henneaux >Priority: Major > Fix For: 3.7.1 > > > Despite several attempts to migrate from Zookeeper cluster to Kraft, it > failed to properly migrate. > We spawn a need cluster fully healthy with 3 Kafka nodes connected to 3 > Zookeeper nodes. 3 new Kafka nodes are there for the new controllers. > It was tested with Kafka 3.6.1, 3.6.2 and 3.7.0. > it might be linked to KAFKA-15330. > The controllers are started without issue. When the brokers are then > configured for the migration, the migration is not starting. Once the last > broker is restarted, we got the following logs. > {code:java} > [2024-06-03 15:11:48,192] INFO [ReplicaFetcherThread-0-11]: Stopped > (kafka.server.ReplicaFetcherThread) > [2024-06-03 15:11:48,193] INFO [ReplicaFetcherThread-0-11]: Shutdown > completed (kafka.server.ReplicaFetcherThread) > {code} > Then we only get the following every 30s > {code:java} > [2024-06-03 15:12:04,163] INFO [BrokerLifecycleManager id=12 isZkBroker=true] > Unable to register the broker because the RPC got timed out before it could > be sent. (kafka.server.BrokerLifecycleManager) > [2024-06-03 15:12:34,297] INFO [BrokerLifecycleManager id=12 isZkBroker=true] > Unable to register the broker because the RPC got timed out before it could > be sent. (kafka.server.BrokerLifecycleManager) > [2024-06-03 15:13:04,536] INFO [BrokerLifecycleManager id=12 isZkBroker=true] > Unable to register the broker because the RPC got timed out before it could > be sent. (kafka.server.BrokerLifecycleManager){code} > The config on the controller node is the following > {code:java} > kafka0202e1 ~]$ sudo grep -v '^\s*$\|^\s*\#' /etc/kafka/server.properties | > grep -v password | sort > advertised.host.name=kafka0202e1.ahub.sb.eu.ginfra.net > broker.rack=e1 > controller.listener.names=CONTROLLER > controller.quorum.voters=2...@kafka0202e1.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e2.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e3.ahub.sb.eu.ginfra.net:9093 > default.replication.factor=3 > delete.topic.enable=false > group.initial.rebalance.delay.ms=3000 > inter.broker.protocol.version=3.7 > listeners=CONTROLLER://kafka0202e1.ahub.sb.eu.ginfra.net:9093 > listener.security.protocol.map=CONTROLLER:SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL > log.dirs=/data/kafka > log.message.format.version=3.6 > log.retention.check.interval.ms=30 > log.retention.hours=240 > log.segment.bytes=1073741824 > min.insync.replicas=2 > node.id=20 > num.io.threads=8 > num.network.threads=3 > num.partitions=1 > num.recovery.threads.per.data.dir=1 > offsets.topic.replication.factor=3 > process.roles=controller > security.inter.broker.protocol=SSL > socket.receive.buffer.bytes=102400 > socket.request.max.bytes=104857600 > socket.send.buffer.bytes=102400 > ssl.cipher.suites=TLS_AES_256_GCM_SHA384 > ssl.client.auth=required > ssl.enabled.protocols=TLSv1.3 > ssl.endpoint.identification.algorithm=HTTPS > ssl.keystore.location=/etc/kafka/ssl/keystore.ts > ssl.keystore.type=JKS > ssl.secure.random.implementation=SHA1PRNG > ssl.truststore.location=/etc/kafka/ssl/truststore.ts > transaction.state.log.min.isr=3 > transaction.state.log.replication.factor=3 > unclean.leader.election.enable=false > zookeeper.connect=10.135.65.199:2181,10.133.65.199:2181,10.137.64.56:2181, > zookeeper.metadata.migration.enable=true > {code} > The config on the broker node is the following > {code} > $ sudo grep -v '^\s*$\|^\s*\#' /etc/kafka/server.properties | grep -v > password | sort > advertised.host.name=kafka0201e3.ahub.sb.eu.ginfra.net > advertised.listeners=SSL://kafka0201e3.ahub.sb.eu.ginfra.net:9092 > broker.id=12 > broker.rack=e3 > controller.listener.names=CONTROLLER # added once all controllers were started > controller.quor
[jira] [Closed] (KAFKA-16883) Zookeeper-Kraft failing migration - RPC got timed out before it could be sent
[ https://issues.apache.org/jira/browse/KAFKA-16883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur closed KAFKA-16883. > Zookeeper-Kraft failing migration - RPC got timed out before it could be sent > - > > Key: KAFKA-16883 > URL: https://issues.apache.org/jira/browse/KAFKA-16883 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.7.0, 3.6.1, 3.6.2 >Reporter: Nicolas Henneaux >Priority: Major > Fix For: 3.7.1 > > > Despite several attempts to migrate from Zookeeper cluster to Kraft, it > failed to properly migrate. > We spawn a need cluster fully healthy with 3 Kafka nodes connected to 3 > Zookeeper nodes. 3 new Kafka nodes are there for the new controllers. > It was tested with Kafka 3.6.1, 3.6.2 and 3.7.0. > it might be linked to KAFKA-15330. > The controllers are started without issue. When the brokers are then > configured for the migration, the migration is not starting. Once the last > broker is restarted, we got the following logs. > {code:java} > [2024-06-03 15:11:48,192] INFO [ReplicaFetcherThread-0-11]: Stopped > (kafka.server.ReplicaFetcherThread) > [2024-06-03 15:11:48,193] INFO [ReplicaFetcherThread-0-11]: Shutdown > completed (kafka.server.ReplicaFetcherThread) > {code} > Then we only get the following every 30s > {code:java} > [2024-06-03 15:12:04,163] INFO [BrokerLifecycleManager id=12 isZkBroker=true] > Unable to register the broker because the RPC got timed out before it could > be sent. (kafka.server.BrokerLifecycleManager) > [2024-06-03 15:12:34,297] INFO [BrokerLifecycleManager id=12 isZkBroker=true] > Unable to register the broker because the RPC got timed out before it could > be sent. (kafka.server.BrokerLifecycleManager) > [2024-06-03 15:13:04,536] INFO [BrokerLifecycleManager id=12 isZkBroker=true] > Unable to register the broker because the RPC got timed out before it could > be sent. (kafka.server.BrokerLifecycleManager){code} > The config on the controller node is the following > {code:java} > kafka0202e1 ~]$ sudo grep -v '^\s*$\|^\s*\#' /etc/kafka/server.properties | > grep -v password | sort > advertised.host.name=kafka0202e1.ahub.sb.eu.ginfra.net > broker.rack=e1 > controller.listener.names=CONTROLLER > controller.quorum.voters=2...@kafka0202e1.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e2.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e3.ahub.sb.eu.ginfra.net:9093 > default.replication.factor=3 > delete.topic.enable=false > group.initial.rebalance.delay.ms=3000 > inter.broker.protocol.version=3.7 > listeners=CONTROLLER://kafka0202e1.ahub.sb.eu.ginfra.net:9093 > listener.security.protocol.map=CONTROLLER:SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL > log.dirs=/data/kafka > log.message.format.version=3.6 > log.retention.check.interval.ms=30 > log.retention.hours=240 > log.segment.bytes=1073741824 > min.insync.replicas=2 > node.id=20 > num.io.threads=8 > num.network.threads=3 > num.partitions=1 > num.recovery.threads.per.data.dir=1 > offsets.topic.replication.factor=3 > process.roles=controller > security.inter.broker.protocol=SSL > socket.receive.buffer.bytes=102400 > socket.request.max.bytes=104857600 > socket.send.buffer.bytes=102400 > ssl.cipher.suites=TLS_AES_256_GCM_SHA384 > ssl.client.auth=required > ssl.enabled.protocols=TLSv1.3 > ssl.endpoint.identification.algorithm=HTTPS > ssl.keystore.location=/etc/kafka/ssl/keystore.ts > ssl.keystore.type=JKS > ssl.secure.random.implementation=SHA1PRNG > ssl.truststore.location=/etc/kafka/ssl/truststore.ts > transaction.state.log.min.isr=3 > transaction.state.log.replication.factor=3 > unclean.leader.election.enable=false > zookeeper.connect=10.135.65.199:2181,10.133.65.199:2181,10.137.64.56:2181, > zookeeper.metadata.migration.enable=true > {code} > The config on the broker node is the following > {code} > $ sudo grep -v '^\s*$\|^\s*\#' /etc/kafka/server.properties | grep -v > password | sort > advertised.host.name=kafka0201e3.ahub.sb.eu.ginfra.net > advertised.listeners=SSL://kafka0201e3.ahub.sb.eu.ginfra.net:9092 > broker.id=12 > broker.rack=e3 > controller.listener.names=CONTROLLER # added once all controllers were started > controller.quorum.voters=2...@kafka0202e1.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e2.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e3.ahub.sb.eu.ginfra.net:9093 > # added once all controllers were started > default.replication.factor=3 > delete.topic.enable=false > group.initial.rebalan
[jira] [Commented] (KAFKA-17146) ZK to KRAFT migration stuck in pre-migration mode
[ https://issues.apache.org/jira/browse/KAFKA-17146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869445#comment-17869445 ] David Arthur commented on KAFKA-17146: -- [~saimon46] thanks for the report. > More than that, the new controller claims to be the CONTROLLER but it refuses > to be it. This is by design, but the active KRaft controller should not stay in this state indefinitely. The KRaft controller will claim the ZK active controller role by writing its node ID into the /controller ZNode. It will then perform the migration of the metadata. There is a period of time where the active KRaft controller will be returning NOT_CONTROLLER to RPCs (while the migration is happening). Can you provide the full log for the active KRaft controller? If it's too large, just messages from KRaftMigrationDriver should shed some light on the situation. > ZK to KRAFT migration stuck in pre-migration mode > - > > Key: KAFKA-17146 > URL: https://issues.apache.org/jira/browse/KAFKA-17146 > Project: Kafka > Issue Type: Bug > Components: controller, kraft, migration >Affects Versions: 3.7.0, 3.7.1 > Environment: Virtual machines isolated: 3 VMs with Kafka brokers + 3 > Zookeeper/KRAFT >Reporter: Simone Brundu >Priority: Blocker > Labels: kraft, migration, zookeeper > > I'm facing a migration from Zookeeper to KRAFT with Kafka 3.7.1 cluster. > (EDIT: the same issue happens for version 3.7.0) > I'm using this configuration to allow SSL everywhere and, SCRAM > authentication only for brokers and PLAIN authentication for controllers > {code:java} > listener.security.protocol.map=EXTERNAL_SASL:SASL_SSL,CONTROLLER:SASL_SSL > inter.broker.listener.name=EXTERNAL_SASL > sasl.enabled.mechanisms=SCRAM-SHA-512,PLAIN > sasl.mechanism=SCRAM-SHA-512 > sasl.mechanism.controller.protocol=PLAIN > sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512 {code} > The cluster has 3 brokers and 3 zookeeper nodes initially then a controllers > cluster with 3 KRAFT controllers is configured and running in parallel as per > documentation for the migration process. > I’ve started the migration with 3 controllers enrolled with SASL_SSL with > PLAIN authentication and I already have a strange TRACE log: > {code:java} > TRACE [KRaftMigrationDriver id=3000] Received metadata delta, but the > controller is not in dual-write mode. Ignoring the change to be replicated to > Zookeeper (org.apache.kafka.metadata.migration.KRaftMigrationDriver) {code} > With later this message where KRAFT is waiting to brokers to connect > {code:java} > INFO [KRaftMigrationDriver id=1000] No brokers are known to KRaft, waiting > for brokers to register. > (org.apache.kafka.metadata.migration.KRaftMigrationDriver) {code} > As soon I start to reconfigure the brokers letting them to connect to the new > controllers, all good in the KRAFT controllers with notifications that the > KRAFT brokers were connecting correctly connected and enrolled > {code:java} > INFO [QuorumController id=1000] Replayed initial RegisterBrokerRecord for > broker 1: RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=true, > incarnationId=xx, brokerEpoch=2638, > endPoints=[BrokerEndpoint(name='EXTERNAL_SASL', host='vmk-tdtkafka-01', > port=9095, securityProtocol=3)], > features=[BrokerFeature(name='metadata.version', minSupportedVersion=19, > maxSupportedVersion=19)], rack='zur1', fenced=true, > inControlledShutdown=false, logDirs=[xx]) > (org.apache.kafka.controller.ClusterControlManager) > [...] > INFO [KRaftMigrationDriver id=1000] Still waiting for ZK brokers [2, 3] to > register with KRaft. > (org.apache.kafka.metadata.migration.KRaftMigrationDriver) > [...] > INFO [KRaftMigrationDriver id=1000] Still waiting for ZK brokers [2] to > register with KRaft. > (org.apache.kafka.metadata.migration.KRaftMigrationDriver) {code} > As soon the first broker is connected we start to get these info logs related > to the migration process in the controller: > {code:java} > INFO [QuorumController id=1000] Cannot run write operation maybeFenceReplicas > in pre-migration mode. Returning NOT_CONTROLLER. > (org.apache.kafka.controller.QuorumController) > INFO [QuorumController id=1000] maybeFenceReplicas: event failed with > NotControllerException in 355 microseconds. Exception message: The controller > is in pre-migration mode. (org.apache.kafka.controller.QuorumController){code} > but as well requests to autocreate topics that exist already, in loop every > 30seconds, in the last broker restart
[jira] [Resolved] (KAFKA-15522) Flaky test org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOneWayReplicationWithFrequentOffsetSyncs
[ https://issues.apache.org/jira/browse/KAFKA-15522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-15522. --- Resolution: Fixed > Flaky test > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOneWayReplicationWithFrequentOffsetSyncs > -- > > Key: KAFKA-15522 > URL: https://issues.apache.org/jira/browse/KAFKA-15522 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0, 3.5.1 >Reporter: Josep Prat >Priority: Major > Labels: flaky, flaky-test > > h3. Last seen: > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14458/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_17_and_Scala_2_13___testOneWayReplicationWithFrequentOffsetSyncs__/ > h3. Error Message > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The request timed out.{code} > h3. Stacktrace > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The request timed out. at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:1276) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:235) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:149) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.startClusters(MirrorConnectorsIntegrationExactlyOnceTest.java:51) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:128) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeEachMethod(TimeoutExtension.java:78) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeMethodInExtensionContext(ClassBasedTestDescriptor.java:521) > at > org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$synthesizeBeforeEachMethodAdapter$23(ClassBasedTestDescriptor.java:506) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeEachMethods$3(TestMethodTestDescriptor.java:175) > at > org.junit.jupiter.engine.descriptor.TestMethod
[jira] [Commented] (KAFKA-15522) Flaky test org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOneWayReplicationWithFrequentOffsetSyncs
[ https://issues.apache.org/jira/browse/KAFKA-15522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869429#comment-17869429 ] Chris Egerton commented on KAFKA-15522: --- We haven't seen this failure in a while and the stack trace in the description indicates it was probably a client/broker issue (or just extremely overloaded testing hardware). I think this is safe to close. > Flaky test > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.testOneWayReplicationWithFrequentOffsetSyncs > -- > > Key: KAFKA-15522 > URL: https://issues.apache.org/jira/browse/KAFKA-15522 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.6.0, 3.5.1 >Reporter: Josep Prat >Priority: Major > Labels: flaky, flaky-test > > h3. Last seen: > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14458/3/testReport/junit/org.apache.kafka.connect.mirror.integration/MirrorConnectorsIntegrationExactlyOnceTest/Build___JDK_17_and_Scala_2_13___testOneWayReplicationWithFrequentOffsetSyncs__/ > h3. Error Message > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The request timed out.{code} > h3. Stacktrace > {code:java} > java.lang.RuntimeException: java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: The request timed out. at > org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:427) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.createTopics(MirrorConnectorsIntegrationBaseTest.java:1276) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:235) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.startClusters(MirrorConnectorsIntegrationBaseTest.java:149) > at > org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest.startClusters(MirrorConnectorsIntegrationExactlyOnceTest.java:51) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:128) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeEachMethod(TimeoutExtension.java:78) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeMethodInExtensionContext(ClassBasedTestDescriptor.java:521) > at > org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$synthesizeBeforeEachMethodAdapter$23(ClassBasedTestDescriptor.java:506) > at > org.junit.jupiter.engine.descriptor.TestMetho
[jira] [Created] (KAFKA-17214) Add 3.8.0 Streams and Core to system tests
Josep Prat created KAFKA-17214: -- Summary: Add 3.8.0 Streams and Core to system tests Key: KAFKA-17214 URL: https://issues.apache.org/jira/browse/KAFKA-17214 Project: Kafka Issue Type: Bug Reporter: Josep Prat As per Release Instructions we should add 3.8.0 version to system tests. Example PRs: * Broker and clients: [https://github.com/apache/kafka/pull/12210] * Streams: [https://github.com/apache/kafka/pull/12209] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17213) Change ControllerMovedException from ApiException to InvalidMetadataException.
[ https://issues.apache.org/jira/browse/KAFKA-17213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat updated KAFKA-17213: --- Fix Version/s: (was: 3.8.0) > Change ControllerMovedException from ApiException to InvalidMetadataException. > -- > > Key: KAFKA-17213 > URL: https://issues.apache.org/jira/browse/KAFKA-17213 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: xiaochen.zhou >Priority: Minor > > After the Kafka client fails to send, it will update the metadata. The > {{InvalidMetadataException}} is retryable. I think the Controller information > also belongs to Kafka's metadata, so {{ControllerMovedException}} should be > {{{}InvalidMetadataException{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-17202) EosIntegrationTest.verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset leaks consumers
[ https://issues.apache.org/jira/browse/KAFKA-17202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17202. Fix Version/s: 3.9.0 Resolution: Fixed > EosIntegrationTest.verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset > leaks consumers > -- > > Key: KAFKA-17202 > URL: https://issues.apache.org/jira/browse/KAFKA-17202 > Project: Kafka > Issue Type: Test > Components: streams >Affects Versions: 3.9.0 >Reporter: Greg Harris >Assignee: TengYao Chi >Priority: Minor > Labels: newbie > Fix For: 3.9.0 > > > This method creates a KafkaConsumer, but does not close it. > We can use a try-with-resources to ensure the consumer is closed prior to > returning or throwing from this function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-4928) Add integration test for DumpLogSegments
[ https://issues.apache.org/jira/browse/KAFKA-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869406#comment-17869406 ] Volk Huang commented on KAFKA-4928: --- [~chia7712] No problem, thank you for the tips! > Add integration test for DumpLogSegments > > > Key: KAFKA-4928 > URL: https://issues.apache.org/jira/browse/KAFKA-4928 > Project: Kafka > Issue Type: Test > Components: log, tools >Reporter: Ismael Juma >Assignee: Volk Huang >Priority: Major > Labels: newbie > > DumpLogSegments is an important tool to analyse log files, but we have no > JUnit tests for it. It would be good to have some tests that verify that the > output is sane for a populated log. > Our system tests call DumpLogSegments, but we should be able to detect > regressions via the JUnit test suite. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-4928) Add integration test for DumpLogSegments
[ https://issues.apache.org/jira/browse/KAFKA-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869400#comment-17869400 ] Chia-Ping Tsai commented on KAFKA-4928: --- [~soravolk] I have assigned this jira to you. BTW, Please add IT to `DumpLogSegmentsTest`. Also, please use new test infra `ClusterTestExtensions`. thanks! > Add integration test for DumpLogSegments > > > Key: KAFKA-4928 > URL: https://issues.apache.org/jira/browse/KAFKA-4928 > Project: Kafka > Issue Type: Test > Components: log, tools >Reporter: Ismael Juma >Assignee: Volk Huang >Priority: Major > Labels: newbie > > DumpLogSegments is an important tool to analyse log files, but we have no > JUnit tests for it. It would be good to have some tests that verify that the > output is sane for a populated log. > Our system tests call DumpLogSegments, but we should be able to detect > regressions via the JUnit test suite. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-4928) Add integration test for DumpLogSegments
[ https://issues.apache.org/jira/browse/KAFKA-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-4928: - Assignee: Volk Huang > Add integration test for DumpLogSegments > > > Key: KAFKA-4928 > URL: https://issues.apache.org/jira/browse/KAFKA-4928 > Project: Kafka > Issue Type: Test > Components: log, tools >Reporter: Ismael Juma >Assignee: Volk Huang >Priority: Major > Labels: newbie > > DumpLogSegments is an important tool to analyse log files, but we have no > JUnit tests for it. It would be good to have some tests that verify that the > output is sane for a populated log. > Our system tests call DumpLogSegments, but we should be able to detect > regressions via the JUnit test suite. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17201) SelectorTest.testInboundConnectionsCountInConnectionCreationMetric leaks sockets and threads
[ https://issues.apache.org/jira/browse/KAFKA-17201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-17201: -- Assignee: TengYao Chi > SelectorTest.testInboundConnectionsCountInConnectionCreationMetric leaks > sockets and threads > > > Key: KAFKA-17201 > URL: https://issues.apache.org/jira/browse/KAFKA-17201 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.9.0 >Reporter: Greg Harris >Assignee: TengYao Chi >Priority: Minor > Labels: flaky-test, newbie > > This test creates multiple Threads which in turn open sockets. It is possible > that these threads and sockets outlive the test itself, as the threads are > not joined before the test completes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-4928) Add integration test for DumpLogSegments
[ https://issues.apache.org/jira/browse/KAFKA-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869399#comment-17869399 ] Volk Huang commented on KAFKA-4928: --- Hi [~ijuma], can I try this one? Thank you so much. > Add integration test for DumpLogSegments > > > Key: KAFKA-4928 > URL: https://issues.apache.org/jira/browse/KAFKA-4928 > Project: Kafka > Issue Type: Test > Components: log, tools >Reporter: Ismael Juma >Priority: Major > Labels: newbie > > DumpLogSegments is an important tool to analyse log files, but we have no > JUnit tests for it. It would be good to have some tests that verify that the > output is sane for a populated log. > Our system tests call DumpLogSegments, but we should be able to detect > regressions via the JUnit test suite. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17116) New consumer may not send effective leave group if member ID received after close
[ https://issues.apache.org/jira/browse/KAFKA-17116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869396#comment-17869396 ] Chia-Ping Tsai commented on KAFKA-17116: [~dajac] thanks for sharing the idea and details. The basic idea is "client needs to know/have the uuid before member is added to group". IMHO both solutions have pros and cons. However, the option_2: using the first HB to get UUID from server needs to deal with the compatibility, as the old server still add the member to the group when handling the first HB. that is to say, the client needs to distinguish whether this member id is added to group or not. otherwise, it may produces some weird log messages. By contrast, the option_1 is non-invasive since our server can handle HB with/without client-defined member id now. Also, the non-java client does not need to change any code if they don't care for this edge case. If they does care that edge case, python, golang, .net, and rust have built-in uuid support, and there are many UUID impl for c++ (uuid_v4, boost's uuid, and uuid_v4). Hence, I assume UUID support is not a big issue. > New consumer may not send effective leave group if member ID received after > close > -- > > Key: KAFKA-17116 > URL: https://issues.apache.org/jira/browse/KAFKA-17116 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Assignee: TengYao Chi >Priority: Major > Labels: kip-848-client-support > Fix For: 3.9.0 > > > If the new consumer is closed after sending a HB to join, but before > receiving the response to it, it will send a leave group request but without > member ID (will simply fail with UNKNOWN_MEMBER_ID). This will make that the > broker will have a registered new member, for which it will never receive a > leave request for it. > # consumer.subscribe -> sends HB to join, transitions to JOINING > # consumer.close -> will transition to LEAVING and send HB with epoch -1 > (without waiting for in-flight requests) > # consumer receives response to initial HB, containing the assigned member > ID. It will simply ignore it because it's not in the group anymore > (UNSUBSCRIBED) > Note that the expectation, with the current logic, and main downsides of this > are: > # If the case was that the member received partitions on the first HB, those > partitions won't be re-assigned (broker waiting for the closed consumer to > reconcile them), until the rebalance timeout expires. > # Even if no partitions were assigned to it, the member will remain in the > group from the broker point of view (but not from the client POV). The member > will be eventually kicked out for not sending HBs, but only when it's session > timeout expires. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17137) Ensure Admin APIs are properly tested
[ https://issues.apache.org/jira/browse/KAFKA-17137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kuan Po Tseng updated KAFKA-17137: -- Description: A number of Admin client APIs don't have integration tests. While testing 3.8.0 RC0 we discovered the Admin.describeTopics() API hung. This should have been caught by tests. I suggest to create subtasks for each API that needs tests. *Part 0* [https://github.com/apache/kafka/pull/16676] createTopics + retryOnQuotaViolation deleteTopics + timeoutMs deleteTopics + retryOnQuotaViolation listTopics + timeoutMs listTopics + listInternal describeTopics + timeoutMs describeTopics + partitionSizeLimitPerResponse *Part 1* [https://github.com/apache/kafka/pull/16648] describeAcls + timeoutMs createAcls + timeoutMs deleteAcls + timeoutMs describeConfigs + timeoutMs alterConfigs + timeoutMs createPartitions + retryOnQuotaViolation expireDelegationToken + expiryTimePeriodMs *Part 2* [https://github.com/apache/kafka/pull/16717] listConsumerGroups + withTypes listConsumerGroupOffsets(groupId) + requireStable listConsumerGroupOffsets(groupSpecs) + requireStable listConsumerGroupOffsets(groupSpecs) removeMembersFromConsumerGroup + reason alterConsumerGroupOffsets alterClientQuotas + validateOnly *Part 3* [https://github.com/apache/kafka/pull/16652] describeUserScramCredentials describeProducers + brokerId describeTransactions + timeoutMs abortTransaction + timeoutMs listTransactions + filterStates listTransactions + filterProducerIds listTransactions + filterOnDuration *Part 4* [https://github.com/apache/kafka/pull/16658] fenceProducers + timeoutMs listClientMetricsResources listClientMetricsResources + timeoutMs clientInstanceId addRaftVoter removeRaftVoter metrics was: A number of Admin client APIs don't have integration tests. While testing 3.8.0 RC0 we discovered the Admin.describeTopics() API hung. This should have been caught by tests. I suggest to create subtasks for each API that needs tests. *Part 0* [https://github.com/apache/kafka/pull/16676] createTopics + retryOnQuotaViolation deleteTopics + timeoutMs deleteTopics + retryOnQuotaViolation listTopics + timeoutMs listTopics + listInternal describeTopics + timeoutMs describeTopics + partitionSizeLimitPerResponse *Part 1* [https://github.com/apache/kafka/pull/16648] describeAcls + timeoutMs createAcls + timeoutMs deleteAcls + timeoutMs describeConfigs + timeoutMs alterConfigs + timeoutMs createPartitions + retryOnQuotaViolation expireDelegationToken + expiryTimePeriodMs *Part 2* listConsumerGroups + withTypes listConsumerGroupOffsets(groupId) + requireStable listConsumerGroupOffsets(groupSpecs) + requireStable listConsumerGroupOffsets(groupSpecs) removeMembersFromConsumerGroup + reason alterConsumerGroupOffsets alterClientQuotas + validateOnly *Part 3* [https://github.com/apache/kafka/pull/16652] describeUserScramCredentials describeProducers + brokerId describeTransactions + timeoutMs abortTransaction + timeoutMs listTransactions + filterStates listTransactions + filterProducerIds listTransactions + filterOnDuration *Part 4* [https://github.com/apache/kafka/pull/16658] fenceProducers + timeoutMs listClientMetricsResources listClientMetricsResources + timeoutMs clientInstanceId addRaftVoter removeRaftVoter metrics > Ensure Admin APIs are properly tested > - > > Key: KAFKA-17137 > URL: https://issues.apache.org/jira/browse/KAFKA-17137 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Mickael Maison >Assignee: Chia-Ping Tsai >Priority: Major > > A number of Admin client APIs don't have integration tests. While testing > 3.8.0 RC0 we discovered the Admin.describeTopics() API hung. This should have > been caught by tests. > I suggest to create subtasks for each API that needs tests. > *Part 0* > [https://github.com/apache/kafka/pull/16676] > createTopics + retryOnQuotaViolation > deleteTopics + timeoutMs > deleteTopics + retryOnQuotaViolation > listTopics + timeoutMs > listTopics + listInternal > describeTopics + timeoutMs > describeTopics + partitionSizeLimitPerResponse > *Part 1* > [https://github.com/apache/kafka/pull/16648] > describeAcls + timeoutMs > createAcls + timeoutMs > deleteAcls + timeoutMs > describeConfigs + timeoutMs > alterConfigs + timeoutMs > createPartitions + retryOnQuotaViolation > expireDelegationToken + expiryTimePeriodMs > *Part 2* > [https://github.com/apache/kafka/pull/16717] > listConsumerGroups + withTypes > listConsumerGroupOffsets(groupId) + requireStable > listConsumerGroupOffsets(groupSpecs) + requireStable > listConsumerGroupOffsets(groupSpecs) > removeMembersFromConsumerGroup + reason > alterConsumerGroupOffse
[jira] [Updated] (KAFKA-17200) Enable MM2 to replicate topics ending in "internal" suffix
[ https://issues.apache.org/jira/browse/KAFKA-17200?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-17200: -- Labels: needs-kip (was: ) > Enable MM2 to replicate topics ending in "internal" suffix > -- > > Key: KAFKA-17200 > URL: https://issues.apache.org/jira/browse/KAFKA-17200 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Patrik Márton >Assignee: Patrik Márton >Priority: Minor > Labels: needs-kip > > In the current Mirror Maker 2 implementation, topics ending in ".internal" or > "-internal" cannot be replicated as they are considered connect/mm2 internal > topics. > In some cases, users have business topics ending in ".internal" or > "-internal" that are excluded from the replication for the same reason. This > is because of two things: > (1) The ReplicationPolicy interface excludes all topics ending in ".internal" > or "-internal" from the replication, as they are considered internal connect > topics: > {code:java} > /** Internal topics are never replicated. */ > default boolean isInternalTopic(String topic) { > boolean isKafkaInternalTopic = topic.startsWith("__") || > topic.startsWith("."); > boolean isDefaultConnectTopic = topic.endsWith("-internal") || > topic.endsWith(".internal"); > return isMM2InternalTopic(topic) || isKafkaInternalTopic || > isDefaultConnectTopic; > } {code} > (2) The DefaultTopicFilter has the following default exclude regular > expression: > {code:java} > ".*[\\-\\.]internal, .*\\.replica, __.*" {code} > The goal of this ticket is to enable the replication of such business topics, > while making sure that kafka internal topics are not replicated. > *Solution 1:* The DefaultTopicFilter can be configured to have a different > exclude list with more specific regex to exclude all kafka internal topics, > but include business internal topics, eg.: > {code:java} > "mm2.*[\\-\\.]internal, .*\\.replica, __.*"; {code} > As these topics are explicitly excluded in the ReplicationPolicy interface > too, we can have a new replication policy from DefaultReplicationPolicy, that > preserves the same behavior, but overrides the isInternalTopic() method in a > way that topics ending with the internal suffix are not considered internal > topics. Obviously in this case it would be important to set up the topic > filter correctly to avoid replicating kafka internal topics. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17207) ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking clients
[ https://issues.apache.org/jira/browse/KAFKA-17207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-17207: - Assignee: Xuze Yang (was: Chris Egerton) > ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking > clients > - > > Key: KAFKA-17207 > URL: https://issues.apache.org/jira/browse/KAFKA-17207 > Project: Kafka > Issue Type: Test > Components: connect >Affects Versions: 3.9.0 >Reporter: Greg Harris >Assignee: Xuze Yang >Priority: Minor > Labels: newbie > > The testRequestTimeouts deletes the internal config topic, putting the > connect workers into a bad state. When the test goes to clean up, it calls > DistributedHerder#stop, which waits for the herder executor to stop. This > times out, because the herder executor is blocked closing the > KafkaConfigBackingStore's producer. This log message gets printed: > {noformat} > [2024-07-26 11:52:50,817] ERROR Executor > java.util.concurrent.ThreadPoolExecutor@7ae97a58[Shutting down, pool size = > 1, active threads = 1, queued tasks = 0, completed tasks = 0] did not > terminate in time (org.apache.kafka.common.utils.ThreadUtils:83){noformat} > This effectively leaks the Kafka clients for the workers' internal topics, > and the herder executor thread. Instead, either the producer should not block > indefinitely on a missing topic, or the cluster state should be healed enough > for the producer to shutdown cleanly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17207) ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking clients
[ https://issues.apache.org/jira/browse/KAFKA-17207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869375#comment-17869375 ] Chris Egerton commented on KAFKA-17207: --- Whoops, Jira hotkeys conspired against me! Did not mean to assign this to myself. Hi [~Xuze Yang] it's all yours, enjoy! > ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking > clients > - > > Key: KAFKA-17207 > URL: https://issues.apache.org/jira/browse/KAFKA-17207 > Project: Kafka > Issue Type: Test > Components: connect >Affects Versions: 3.9.0 >Reporter: Greg Harris >Assignee: Chris Egerton >Priority: Minor > Labels: newbie > > The testRequestTimeouts deletes the internal config topic, putting the > connect workers into a bad state. When the test goes to clean up, it calls > DistributedHerder#stop, which waits for the herder executor to stop. This > times out, because the herder executor is blocked closing the > KafkaConfigBackingStore's producer. This log message gets printed: > {noformat} > [2024-07-26 11:52:50,817] ERROR Executor > java.util.concurrent.ThreadPoolExecutor@7ae97a58[Shutting down, pool size = > 1, active threads = 1, queued tasks = 0, completed tasks = 0] did not > terminate in time (org.apache.kafka.common.utils.ThreadUtils:83){noformat} > This effectively leaks the Kafka clients for the workers' internal topics, > and the herder executor thread. Instead, either the producer should not block > indefinitely on a missing topic, or the cluster state should be healed enough > for the producer to shutdown cleanly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17207) ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking clients
[ https://issues.apache.org/jira/browse/KAFKA-17207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-17207: - Assignee: Chris Egerton > ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking > clients > - > > Key: KAFKA-17207 > URL: https://issues.apache.org/jira/browse/KAFKA-17207 > Project: Kafka > Issue Type: Test > Components: connect >Affects Versions: 3.9.0 >Reporter: Greg Harris >Assignee: Chris Egerton >Priority: Minor > Labels: newbie > > The testRequestTimeouts deletes the internal config topic, putting the > connect workers into a bad state. When the test goes to clean up, it calls > DistributedHerder#stop, which waits for the herder executor to stop. This > times out, because the herder executor is blocked closing the > KafkaConfigBackingStore's producer. This log message gets printed: > {noformat} > [2024-07-26 11:52:50,817] ERROR Executor > java.util.concurrent.ThreadPoolExecutor@7ae97a58[Shutting down, pool size = > 1, active threads = 1, queued tasks = 0, completed tasks = 0] did not > terminate in time (org.apache.kafka.common.utils.ThreadUtils:83){noformat} > This effectively leaks the Kafka clients for the workers' internal topics, > and the herder executor thread. Instead, either the producer should not block > indefinitely on a missing topic, or the cluster state should be healed enough > for the producer to shutdown cleanly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17044) Connector deletion can lead to resource leak during a long running connector startup
[ https://issues.apache.org/jira/browse/KAFKA-17044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869372#comment-17869372 ] Chris Egerton commented on KAFKA-17044: --- [~bgoyal] pinging again, please let me know if this can be closed. > Connector deletion can lead to resource leak during a long running connector > startup > > > Key: KAFKA-17044 > URL: https://issues.apache.org/jira/browse/KAFKA-17044 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Bhagyashree >Priority: Major > > We have identified a gap in the shutdown flow for the connector worker. If > the connector is in > [INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404] > state and still executing the > [WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218] > method, a DELETE API call would invoke the > [WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298] > and [notify() > |https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but > the connector worker would not shutdown immediately. This happens because > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > is a blocking call and the control reaches > [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176] > in > [doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151] > after the > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > call has completed. This results in a gap in the delete flow where the > connector is not immediately shutdown leaving the resources running. > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > keeps running and only when the execution of > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > completes, we reach at the point of > [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176] > and then > [doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183] > of the connector worker is invoked. > This seems similar to what has been identified for connector tasks as part of > https://issues.apache.org/jira/browse/KAFKA-14725. > *Steps to repro* > 1. Start a connector with time taking operation in > [connector.start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > call > 2. Call DELETE API to delete this connector > 3. The connector would be deleted only after the > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > completes. > The issue was observed when a connector was configured to retry a db > connection for sometime. > {*}Current Behaviour{*}: The connector did not shutdown until the > [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216] > method completed. > {*}Expected Behaviou{*}r: The connector should abort what it is doing and > shutdown as requested by the Delete call. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17116) New consumer may not send effective leave group if member ID received after close
[ https://issues.apache.org/jira/browse/KAFKA-17116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869371#comment-17869371 ] David Jacot commented on KAFKA-17116: - Hi all, I was thinking about two options to resolve this issue: # We could actually generate the member id on the client side. This is already supported by the protocol. Back then, we rejected this option because it was considered too difficult for non java clients such as librdkafka as some languages do not have built-in uuid support. This may be better now. We can check this. We also need to convince ourselves that letting the client generate it is good enough. # We could use the first HB send out by the client to generate the uuid without adding the member to the group yet. So, the first HB would send back the new member id, zero as the member epoch as, zero as the heartbeat interval so the client HB immediately. This is basically a way to do 1. but without relying on the client to generate it. Note that this does not require any changes to the protocol too. An alternative to this would be to return a new error code to make it explicit but I am not sure if it is worth it. I am not a fan of the idea of passing a temporary ID on the client that we would store on the server. If we believe that we could generate an id on the client, we should just do 1. I think that we could also improve the client state machine to better handle those cases. > New consumer may not send effective leave group if member ID received after > close > -- > > Key: KAFKA-17116 > URL: https://issues.apache.org/jira/browse/KAFKA-17116 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Lianet Magrans >Assignee: TengYao Chi >Priority: Major > Labels: kip-848-client-support > Fix For: 3.9.0 > > > If the new consumer is closed after sending a HB to join, but before > receiving the response to it, it will send a leave group request but without > member ID (will simply fail with UNKNOWN_MEMBER_ID). This will make that the > broker will have a registered new member, for which it will never receive a > leave request for it. > # consumer.subscribe -> sends HB to join, transitions to JOINING > # consumer.close -> will transition to LEAVING and send HB with epoch -1 > (without waiting for in-flight requests) > # consumer receives response to initial HB, containing the assigned member > ID. It will simply ignore it because it's not in the group anymore > (UNSUBSCRIBED) > Note that the expectation, with the current logic, and main downsides of this > are: > # If the case was that the member received partitions on the first HB, those > partitions won't be re-assigned (broker waiting for the closed consumer to > reconcile them), until the rebalance timeout expires. > # Even if no partitions were assigned to it, the member will remain in the > group from the broker point of view (but not from the client POV). The member > will be eventually kicked out for not sending HBs, but only when it's session > timeout expires. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16263) Add Kafka Streams docs about available listeners/callback
[ https://issues.apache.org/jira/browse/KAFKA-16263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] zhengke zhou reassigned KAFKA-16263: Assignee: zhengke zhou (was: Kuan Po Tseng) > Add Kafka Streams docs about available listeners/callback > - > > Key: KAFKA-16263 > URL: https://issues.apache.org/jira/browse/KAFKA-16263 > Project: Kafka > Issue Type: Task > Components: docs, streams >Reporter: Matthias J. Sax >Assignee: zhengke zhou >Priority: Minor > Labels: beginner, newbie > > Kafka Streams allows to register all kind of listeners and callback (eg, > uncaught-exception-handler, restore-listeners, etc) but those are not in the > documentation. > A good place might be > [https://kafka.apache.org/documentation/streams/developer-guide/running-app.html] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17213) Change ControllerMovedException from ApiException to InvalidMetadataException.
[ https://issues.apache.org/jira/browse/KAFKA-17213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaochen.zhou updated KAFKA-17213: -- Fix Version/s: 3.8.0 (was: 3.7.0) > Change ControllerMovedException from ApiException to InvalidMetadataException. > -- > > Key: KAFKA-17213 > URL: https://issues.apache.org/jira/browse/KAFKA-17213 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: xiaochen.zhou >Priority: Minor > Fix For: 3.8.0 > > > After the Kafka client fails to send, it will update the metadata. The > {{InvalidMetadataException}} is retryable. I think the Controller information > also belongs to Kafka's metadata, so {{ControllerMovedException}} should be > {{{}InvalidMetadataException{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17213) Change ControllerMovedException from ApiException to InvalidMetadataException.
[ https://issues.apache.org/jira/browse/KAFKA-17213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaochen.zhou updated KAFKA-17213: -- Component/s: clients Fix Version/s: 3.7.0 Description: After the Kafka client fails to send, it will update the metadata. The {{InvalidMetadataException}} is retryable. I think the Controller information also belongs to Kafka's metadata, so {{ControllerMovedException}} should be {{{}InvalidMetadataException{}}}. Issue Type: Improvement (was: New Feature) Priority: Minor (was: Major) Summary: Change ControllerMovedException from ApiException to InvalidMetadataException. (was: Make ) > Change ControllerMovedException from ApiException to InvalidMetadataException. > -- > > Key: KAFKA-17213 > URL: https://issues.apache.org/jira/browse/KAFKA-17213 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: xiaochen.zhou >Priority: Minor > Fix For: 3.7.0 > > > After the Kafka client fails to send, it will update the metadata. The > {{InvalidMetadataException}} is retryable. I think the Controller information > also belongs to Kafka's metadata, so {{ControllerMovedException}} should be > {{{}InvalidMetadataException{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17213) Make
[ https://issues.apache.org/jira/browse/KAFKA-17213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xiaochen.zhou updated KAFKA-17213: -- Summary: Make(was: Make ) > Make > -- > > Key: KAFKA-17213 > URL: https://issues.apache.org/jira/browse/KAFKA-17213 > Project: Kafka > Issue Type: New Feature >Reporter: xiaochen.zhou >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17213) Make
xiaochen.zhou created KAFKA-17213: - Summary: Make Key: KAFKA-17213 URL: https://issues.apache.org/jira/browse/KAFKA-17213 Project: Kafka Issue Type: New Feature Reporter: xiaochen.zhou -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-17177) reviewers.py should grep "authors" to offer more candidates of reviewers information
[ https://issues.apache.org/jira/browse/KAFKA-17177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17177. Fix Version/s: 3.9.0 Resolution: Fixed > reviewers.py should grep "authors" to offer more candidates of reviewers > information > > > Key: KAFKA-17177 > URL: https://issues.apache.org/jira/browse/KAFKA-17177 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia Chuan Yu >Priority: Minor > Fix For: 3.9.0 > > > It seems to me `author` should be a qualified reviewer too. Also, `author` > normally have "email" which is hard to be found from github page sometimes ... -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17211) kafka producer get stuck by Uncaught error in kafka producer I/O thread
[ https://issues.apache.org/jira/browse/KAFKA-17211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869311#comment-17869311 ] yapeng commented on KAFKA-17211: I noticed there is producer sender thread poll elements from Deque and producer client may invoke RecordAccumulator.append. Both actions can make changes to head, tail to ArrayDeque. Although there is synchronized but it still has race conditions because head, tail are not defined as volatile in ArrayDeque. {code:java} public class ArrayDeque extends AbstractCollection implements Deque, Cloneable, Serializable { /** * The array in which the elements of the deque are stored. * The capacity of the deque is the length of this array, which is * always a power of two. The array is never allowed to become * full, except transiently within an addX method where it is * resized (see doubleCapacity) immediately upon becoming full, * thus avoiding head and tail wrapping around to equal each * other. We also guarantee that all array cells not holding * deque elements are always null. */ transient Object[] elements; // non-private to simplify nested class access /** * The index of the element at the head of the deque (which is the * element that would be removed by remove() or pop()); or an * arbitrary number equal to tail if the deque is empty. */ transient int head; /** * The index at which the next element would be added to the tail * of the deque (via addLast(E), add(E), or push(E)). */ transient int tail; . . }{code} Is this possibly cause this bug? > kafka producer get stuck by Uncaught error in kafka producer I/O thread > --- > > Key: KAFKA-17211 > URL: https://issues.apache.org/jira/browse/KAFKA-17211 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.8.2 >Reporter: yapeng >Priority: Major > > I have a kafka producer and it reported this error message forever. > {code:java} > [Producer clientId=xxx] Uncaught error in kafka producer I/O > thread: > java.util.NoSuchElementException: null > at java.util.ArrayDeque.getFirst(ArrayDeque.java:329) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.expiredBatches(RecordAccumulator.java:306) > at > org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:372) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:326) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:255) > at java.lang.Thread.run(Thread.java:750){code} > It seems wired that there has null values in Deque. The > machine has been restarted to solve online issues so there is no heap dump. > Please help give some insights about this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17212) Segments containing a single message can be incorrectly marked as local only
Guillaume Mallet created KAFKA-17212: Summary: Segments containing a single message can be incorrectly marked as local only Key: KAFKA-17212 URL: https://issues.apache.org/jira/browse/KAFKA-17212 Project: Kafka Issue Type: Bug Components: Tiered-Storage Affects Versions: 3.7.1, 3.8.0, 3.9.0 Reporter: Guillaume Mallet There is an edge case triggered when a segment containing a single message causes the segment to be considered as local only which skews the deletion process towards deleting more data. *This is very unlikely to happen in a real scenario but can happen in tests when segment are rolled manually.* *It could possibly happen when segment are rolled based on time but even then the skew would be minimal.* h2. What happens In order to delete the right amount of data against the byte retention policy, we first count all the bytes in [buildRetentionSizeData|https://github.com/apache/kafka/blob/09be14bb09dc336f941a7859232094bfb3cb3b96/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1335] function that are breaching {{{}retention.bytes{}}}. In order to do this, the size of each segment is added to the size of the segments present only on the disk {{{}onlyLocalLogSegmentsSize{}}}. Listing the segment only present on disk is made through the function [onlyLocalLogSegmentSize|https://github.com/apache/kafka/blob/a0f6e6f816c6ac3fbbc4e0dc503dc43bfacfe6c7/core/src/main/scala/kafka/log/UnifiedLog.scala#L1618-L1619] by adding the size of each segments that have a _baseOffset_ greater or equal compared to {{{}highestOffsetInRemoteStorage{}}}{_}.{_} {{highestOffsetInRemoteStorage}} is the highest offset that has been successfully sent to the remote store{_}.{_} The _baseOffset_ of a segment is “a [lower bound ({*}inclusive{*}) of the offset in the segment”|https://github.com/apache/kafka/blob/a0f6e6f816c6ac3fbbc4e0dc503dc43bfacfe6c7/storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java#L115]. In the case of a segment with a single message, the baseOffset can be equal to _highestOffsetInRemoteStorage,_ which means that despite the offset being offloaded to the RemoteStorage, we would count that segment as local only. This has consequence when counting the bytes to delete as we will count the size of this segment twice in the [buildRetentionSizeData|https://github.com/apache/kafka/blob/09be14bb09dc336f941a7859232094bfb3cb3b96/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1155], once as a segment offloaded in the RemoteStorage and once as a local segment when [onlyLocalSegmentSize|https://github.com/apache/kafka/blob/a0f6e6f816c6ac3fbbc4e0dc503dc43bfacfe6c7/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1361-L1363] is added. The result is that {{remainingBreachedSize}} will be higher than expected which can lead to more byte deleted than what we would initially expect, up to the size of the segment which is double counted. The issue is due to the fact we are using a greater or equal rather than equal. A segment present only locally will have a {{baseOffset}} strictly greater than {{highestOffsetInRemoteStorage.}} h2. Reproducing the issue The problem is highlighted in the 2 tests added in this [commit |https://github.com/apache/kafka/commit/97af351db517d69a2b37c92861e463a6d0c5cb8f] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16390) consume_bench_test.py failed using AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-16390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869299#comment-17869299 ] Chia-Ping Tsai commented on KAFKA-16390: [~frankvicky] thanks for taking over this. Please feel free to file PR for it > consume_bench_test.py failed using AsyncKafkaConsumer > - > > Key: KAFKA-16390 > URL: https://issues.apache.org/jira/browse/KAFKA-16390 > Project: Kafka > Issue Type: Task > Components: consumer, system tests >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: kip-848-client-support > > Ran the system test based on KAFKA-16273 > The following tests failed using the consumer group protocol > {code:java} > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_single_partition.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > {code} > Because of > {code:java} > TimeoutError('consume_workload failed to finish in the expected amount of > time.') > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", > line 146, in test_single_partition > consume_workload.wait_for_done(timeout_sec=180) > File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line > 352, in wait_for_done > wait_until(lambda: self.done(), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: consume_workload failed to finish in the > expected amount of time. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16390) consume_bench_test.py failed using AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-16390?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869293#comment-17869293 ] TengYao Chi commented on KAFKA-16390: - Hello [~kirktrue] I have some ideas regarding this issue. If you haven’t started working on it yet, I’d like to give it a try. :D > consume_bench_test.py failed using AsyncKafkaConsumer > - > > Key: KAFKA-16390 > URL: https://issues.apache.org/jira/browse/KAFKA-16390 > Project: Kafka > Issue Type: Task > Components: consumer, system tests >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > Labels: kip-848-client-support > > Ran the system test based on KAFKA-16273 > The following tests failed using the consumer group protocol > {code:java} > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_consume_bench.topics=.consume_bench_topic.0-5.0-4.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_multiple_consumers_random_group_partitions.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > kafkatest.tests.core.consume_bench_test.ConsumeBenchTest.test_single_partition.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > {code} > Because of > {code:java} > TimeoutError('consume_workload failed to finish in the expected amount of > time.') > Traceback (most recent call last): > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 186, in _do_run > data = self.run_test() > File > "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", > line 246, in run_test > return self.test_context.function(self.test) > File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line > 433, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File "/opt/kafka-dev/tests/kafkatest/tests/core/consume_bench_test.py", > line 146, in test_single_partition > consume_workload.wait_for_done(timeout_sec=180) > File "/opt/kafka-dev/tests/kafkatest/services/trogdor/trogdor.py", line > 352, in wait_for_done > wait_until(lambda: self.done(), > File "/usr/local/lib/python3.9/dist-packages/ducktape/utils/util.py", line > 58, in wait_until > raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from > last_exception > ducktape.errors.TimeoutError: consume_workload failed to finish in the > expected amount of time. {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17211) kafka producer get stuck by Uncaught error in kafka producer I/O thread
yapeng created KAFKA-17211: -- Summary: kafka producer get stuck by Uncaught error in kafka producer I/O thread Key: KAFKA-17211 URL: https://issues.apache.org/jira/browse/KAFKA-17211 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.8.2 Reporter: yapeng I have a kafka producer and it reported this error message forever. {code:java} [Producer clientId=xxx] Uncaught error in kafka producer I/O thread: java.util.NoSuchElementException: null at java.util.ArrayDeque.getFirst(ArrayDeque.java:329) at org.apache.kafka.clients.producer.internals.RecordAccumulator.expiredBatches(RecordAccumulator.java:306) at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:372) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:326) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:255) at java.lang.Thread.run(Thread.java:750){code} It seems wired that there has null values in Deque. The machine has been restarted to solve online issues so there is no heap dump. Please help give some insights about this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15330) Migration from ZK to KRaft works with 3.4 but fails from 3.5 upwards
[ https://issues.apache.org/jira/browse/KAFKA-15330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869256#comment-17869256 ] Simone Brundu commented on KAFKA-15330: --- I completely agree in having a clean cluster for the migration without any AUTH/SCRAM/SSL. The problem is that remove all these extra configs will require a short downtime before and after the migration. I can give a try with a cluster without any auth, but find a way to have basic PLAINTEXT protocol unauthenticated. > Migration from ZK to KRaft works with 3.4 but fails from 3.5 upwards > > > Key: KAFKA-15330 > URL: https://issues.apache.org/jira/browse/KAFKA-15330 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0, 3.5.1 > Environment: Debian Bookworm/12.1 > kafka 3.4 and 3.5 / scala 2.13 > OpenJDK Runtime Environment (build 17.0.8+7-Debian-1deb12u1) >Reporter: Roland Sommer >Priority: Major > Attachments: broker.properties, controller.properties > > > We recently did some migration testing from our old ZK-based kafka clusters > to KRaft while still being on kafka 3.4. The migration tests succeeded at > first try. In the meantime we updated to kafka 3.5/3.5.1 and now we wanted to > continue our migration work, which ran into unexpected problems. > On the controller we get messages like: > {code:java} > Aug 10 06:49:33 kafkactl01 kafka-server-start.sh[48572]: [2023-08-10 > 06:49:33,072] INFO [KRaftMigrationDriver id=495] Still waiting for all > controller nodes ready to begin the migration. due to: Missing apiVersion > from nodes: [514, 760] > (org.apache.kafka.metadata.migration.KRaftMigrationDriver){code} > On the broker side, we see: > {code:java} > 06:52:56,109] INFO [BrokerLifecycleManager id=6 isZkBroker=true] Unable to > register the broker because the RPC got timed out before it could be sent. > (kafka.server.BrokerLifecycleManager){code} > If we reinstall the same development cluster with kafka 3.4, using the exact > same steps provided by your migration documentation (only difference is using > {{inter.broker.protocol.version=3.4}} instead of > {{{}inter.broker.protocol.version=3.5{}}}), everything works as expected. > Updating to kafka 3.5/3.5.1 yields the same problems. > Testing is done on a three-node kafka cluster with a three-node zookeeper > ensemble and a three-node controller setup. > Besides our default configuration containing the active zookeeper hosts etc., > this is what was added on the brokers: > {code:java} > # Migration > advertised.listeners=PLAINTEXT://kafka03:9092 > listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT > zookeeper.metadata.migration.enable=true > controller.quorum.voters=495@kafkactl01:9093,760@kafkactl02:9093,514@kafkactl03:9093 > controller.listener.names=CONTROLLER > {code} > The main controller config looks like this: > {code:java} > process.roles=controller > node.id=495 > controller.quorum.voters=495@kafkactl01:9093,760@kafkactl02:9093,514@kafkactl03:9093 > listeners=CONTROLLER://:9093 > inter.broker.listener.name=PLAINTEXT > controller.listener.names=CONTROLLER > listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT > zookeeper.metadata.migration.enable=true > {code} > Both configs contain the identical {{zookeeper.connect}} settings, everything > is setup automatically so it should be identical on every run and we can > reliably reproduce migration success on kafka 3.4 and migration failure using > the same setup with kafka 3.5. > There are other issues mentioning problems with ApiVersions like KAFKA-15230 > - not quite sure if this is a duplicate of the underlying problem there. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15330) Migration from ZK to KRaft works with 3.4 but fails from 3.5 upwards
[ https://issues.apache.org/jira/browse/KAFKA-15330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869253#comment-17869253 ] Roland Sommer commented on KAFKA-15330: --- [~saimon46] our config is basically what I attached earlier in this ticket. As the affected clusters run in an isolated environment, we refrained from additional measures like auth/SCRAM/SSL. > Migration from ZK to KRaft works with 3.4 but fails from 3.5 upwards > > > Key: KAFKA-15330 > URL: https://issues.apache.org/jira/browse/KAFKA-15330 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0, 3.5.1 > Environment: Debian Bookworm/12.1 > kafka 3.4 and 3.5 / scala 2.13 > OpenJDK Runtime Environment (build 17.0.8+7-Debian-1deb12u1) >Reporter: Roland Sommer >Priority: Major > Attachments: broker.properties, controller.properties > > > We recently did some migration testing from our old ZK-based kafka clusters > to KRaft while still being on kafka 3.4. The migration tests succeeded at > first try. In the meantime we updated to kafka 3.5/3.5.1 and now we wanted to > continue our migration work, which ran into unexpected problems. > On the controller we get messages like: > {code:java} > Aug 10 06:49:33 kafkactl01 kafka-server-start.sh[48572]: [2023-08-10 > 06:49:33,072] INFO [KRaftMigrationDriver id=495] Still waiting for all > controller nodes ready to begin the migration. due to: Missing apiVersion > from nodes: [514, 760] > (org.apache.kafka.metadata.migration.KRaftMigrationDriver){code} > On the broker side, we see: > {code:java} > 06:52:56,109] INFO [BrokerLifecycleManager id=6 isZkBroker=true] Unable to > register the broker because the RPC got timed out before it could be sent. > (kafka.server.BrokerLifecycleManager){code} > If we reinstall the same development cluster with kafka 3.4, using the exact > same steps provided by your migration documentation (only difference is using > {{inter.broker.protocol.version=3.4}} instead of > {{{}inter.broker.protocol.version=3.5{}}}), everything works as expected. > Updating to kafka 3.5/3.5.1 yields the same problems. > Testing is done on a three-node kafka cluster with a three-node zookeeper > ensemble and a three-node controller setup. > Besides our default configuration containing the active zookeeper hosts etc., > this is what was added on the brokers: > {code:java} > # Migration > advertised.listeners=PLAINTEXT://kafka03:9092 > listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT > zookeeper.metadata.migration.enable=true > controller.quorum.voters=495@kafkactl01:9093,760@kafkactl02:9093,514@kafkactl03:9093 > controller.listener.names=CONTROLLER > {code} > The main controller config looks like this: > {code:java} > process.roles=controller > node.id=495 > controller.quorum.voters=495@kafkactl01:9093,760@kafkactl02:9093,514@kafkactl03:9093 > listeners=CONTROLLER://:9093 > inter.broker.listener.name=PLAINTEXT > controller.listener.names=CONTROLLER > listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT > zookeeper.metadata.migration.enable=true > {code} > Both configs contain the identical {{zookeeper.connect}} settings, everything > is setup automatically so it should be identical on every run and we can > reliably reproduce migration success on kafka 3.4 and migration failure using > the same setup with kafka 3.5. > There are other issues mentioning problems with ApiVersions like KAFKA-15230 > - not quite sure if this is a duplicate of the underlying problem there. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15330) Migration from ZK to KRaft works with 3.4 but fails from 3.5 upwards
[ https://issues.apache.org/jira/browse/KAFKA-15330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869252#comment-17869252 ] Simone Brundu commented on KAFKA-15330: --- [~rsommer] , how did you manage to make the migration successful? For us, with 3.7.1 we have this issue KAFKA-17146 . Did you have any authentication, SCRAM, SSL enabled in your cluster? Maybe something fails silently during the migration and we don't notice it. Can you post here the configuration in the brokers and in the KRAFT controllers? > Migration from ZK to KRaft works with 3.4 but fails from 3.5 upwards > > > Key: KAFKA-15330 > URL: https://issues.apache.org/jira/browse/KAFKA-15330 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0, 3.5.1 > Environment: Debian Bookworm/12.1 > kafka 3.4 and 3.5 / scala 2.13 > OpenJDK Runtime Environment (build 17.0.8+7-Debian-1deb12u1) >Reporter: Roland Sommer >Priority: Major > Attachments: broker.properties, controller.properties > > > We recently did some migration testing from our old ZK-based kafka clusters > to KRaft while still being on kafka 3.4. The migration tests succeeded at > first try. In the meantime we updated to kafka 3.5/3.5.1 and now we wanted to > continue our migration work, which ran into unexpected problems. > On the controller we get messages like: > {code:java} > Aug 10 06:49:33 kafkactl01 kafka-server-start.sh[48572]: [2023-08-10 > 06:49:33,072] INFO [KRaftMigrationDriver id=495] Still waiting for all > controller nodes ready to begin the migration. due to: Missing apiVersion > from nodes: [514, 760] > (org.apache.kafka.metadata.migration.KRaftMigrationDriver){code} > On the broker side, we see: > {code:java} > 06:52:56,109] INFO [BrokerLifecycleManager id=6 isZkBroker=true] Unable to > register the broker because the RPC got timed out before it could be sent. > (kafka.server.BrokerLifecycleManager){code} > If we reinstall the same development cluster with kafka 3.4, using the exact > same steps provided by your migration documentation (only difference is using > {{inter.broker.protocol.version=3.4}} instead of > {{{}inter.broker.protocol.version=3.5{}}}), everything works as expected. > Updating to kafka 3.5/3.5.1 yields the same problems. > Testing is done on a three-node kafka cluster with a three-node zookeeper > ensemble and a three-node controller setup. > Besides our default configuration containing the active zookeeper hosts etc., > this is what was added on the brokers: > {code:java} > # Migration > advertised.listeners=PLAINTEXT://kafka03:9092 > listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT > zookeeper.metadata.migration.enable=true > controller.quorum.voters=495@kafkactl01:9093,760@kafkactl02:9093,514@kafkactl03:9093 > controller.listener.names=CONTROLLER > {code} > The main controller config looks like this: > {code:java} > process.roles=controller > node.id=495 > controller.quorum.voters=495@kafkactl01:9093,760@kafkactl02:9093,514@kafkactl03:9093 > listeners=CONTROLLER://:9093 > inter.broker.listener.name=PLAINTEXT > controller.listener.names=CONTROLLER > listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT > zookeeper.metadata.migration.enable=true > {code} > Both configs contain the identical {{zookeeper.connect}} settings, everything > is setup automatically so it should be identical on every run and we can > reliably reproduce migration success on kafka 3.4 and migration failure using > the same setup with kafka 3.5. > There are other issues mentioning problems with ApiVersions like KAFKA-15230 > - not quite sure if this is a duplicate of the underlying problem there. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17210) Broker fixes for smooth concurrent fetches on share partition
Abhinav Dixit created KAFKA-17210: - Summary: Broker fixes for smooth concurrent fetches on share partition Key: KAFKA-17210 URL: https://issues.apache.org/jira/browse/KAFKA-17210 Project: Kafka Issue Type: Sub-task Reporter: Abhinav Dixit Assignee: Abhinav Dixit Identified a couple of reliability issues with broker code for share groups - # Broker seems to get stuck at times when using multiple share consumers due to a corner case where the second last fetch request did not contain any topic partition to fetch, because of which the broker could never complete the last request. This results in a share fetch request getting stuck. # Since persister would not perform any business logic around sending state batches for a share partition, there could be scenarios where it sends state batches with no AVAILABLE records. This could cause a breach on the limit of in-flight messages we have configured, and hence broker would never be able to complete the share fetch requests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15330) Migration from ZK to KRaft works with 3.4 but fails from 3.5 upwards
[ https://issues.apache.org/jira/browse/KAFKA-15330?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869242#comment-17869242 ] Roland Sommer commented on KAFKA-15330: --- Finally, with kafka 3.7.1 all migrations were successfully finished on our kafka clusters. > Migration from ZK to KRaft works with 3.4 but fails from 3.5 upwards > > > Key: KAFKA-15330 > URL: https://issues.apache.org/jira/browse/KAFKA-15330 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0, 3.5.1 > Environment: Debian Bookworm/12.1 > kafka 3.4 and 3.5 / scala 2.13 > OpenJDK Runtime Environment (build 17.0.8+7-Debian-1deb12u1) >Reporter: Roland Sommer >Priority: Major > Attachments: broker.properties, controller.properties > > > We recently did some migration testing from our old ZK-based kafka clusters > to KRaft while still being on kafka 3.4. The migration tests succeeded at > first try. In the meantime we updated to kafka 3.5/3.5.1 and now we wanted to > continue our migration work, which ran into unexpected problems. > On the controller we get messages like: > {code:java} > Aug 10 06:49:33 kafkactl01 kafka-server-start.sh[48572]: [2023-08-10 > 06:49:33,072] INFO [KRaftMigrationDriver id=495] Still waiting for all > controller nodes ready to begin the migration. due to: Missing apiVersion > from nodes: [514, 760] > (org.apache.kafka.metadata.migration.KRaftMigrationDriver){code} > On the broker side, we see: > {code:java} > 06:52:56,109] INFO [BrokerLifecycleManager id=6 isZkBroker=true] Unable to > register the broker because the RPC got timed out before it could be sent. > (kafka.server.BrokerLifecycleManager){code} > If we reinstall the same development cluster with kafka 3.4, using the exact > same steps provided by your migration documentation (only difference is using > {{inter.broker.protocol.version=3.4}} instead of > {{{}inter.broker.protocol.version=3.5{}}}), everything works as expected. > Updating to kafka 3.5/3.5.1 yields the same problems. > Testing is done on a three-node kafka cluster with a three-node zookeeper > ensemble and a three-node controller setup. > Besides our default configuration containing the active zookeeper hosts etc., > this is what was added on the brokers: > {code:java} > # Migration > advertised.listeners=PLAINTEXT://kafka03:9092 > listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT > zookeeper.metadata.migration.enable=true > controller.quorum.voters=495@kafkactl01:9093,760@kafkactl02:9093,514@kafkactl03:9093 > controller.listener.names=CONTROLLER > {code} > The main controller config looks like this: > {code:java} > process.roles=controller > node.id=495 > controller.quorum.voters=495@kafkactl01:9093,760@kafkactl02:9093,514@kafkactl03:9093 > listeners=CONTROLLER://:9093 > inter.broker.listener.name=PLAINTEXT > controller.listener.names=CONTROLLER > listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT > zookeeper.metadata.migration.enable=true > {code} > Both configs contain the identical {{zookeeper.connect}} settings, everything > is setup automatically so it should be identical on every run and we can > reliably reproduce migration success on kafka 3.4 and migration failure using > the same setup with kafka 3.5. > There are other issues mentioning problems with ApiVersions like KAFKA-15230 > - not quite sure if this is a duplicate of the underlying problem there. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16883) Zookeeper-Kraft failing migration - RPC got timed out before it could be sent
[ https://issues.apache.org/jira/browse/KAFKA-16883?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nicolas Henneaux resolved KAFKA-16883. -- Fix Version/s: 3.7.1 Resolution: Fixed I re-executed the migration on v3.7.1 and now it works! > Zookeeper-Kraft failing migration - RPC got timed out before it could be sent > - > > Key: KAFKA-16883 > URL: https://issues.apache.org/jira/browse/KAFKA-16883 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.7.0, 3.6.1, 3.6.2 >Reporter: Nicolas Henneaux >Priority: Major > Fix For: 3.7.1 > > > Despite several attempts to migrate from Zookeeper cluster to Kraft, it > failed to properly migrate. > We spawn a need cluster fully healthy with 3 Kafka nodes connected to 3 > Zookeeper nodes. 3 new Kafka nodes are there for the new controllers. > It was tested with Kafka 3.6.1, 3.6.2 and 3.7.0. > it might be linked to KAFKA-15330. > The controllers are started without issue. When the brokers are then > configured for the migration, the migration is not starting. Once the last > broker is restarted, we got the following logs. > {code:java} > [2024-06-03 15:11:48,192] INFO [ReplicaFetcherThread-0-11]: Stopped > (kafka.server.ReplicaFetcherThread) > [2024-06-03 15:11:48,193] INFO [ReplicaFetcherThread-0-11]: Shutdown > completed (kafka.server.ReplicaFetcherThread) > {code} > Then we only get the following every 30s > {code:java} > [2024-06-03 15:12:04,163] INFO [BrokerLifecycleManager id=12 isZkBroker=true] > Unable to register the broker because the RPC got timed out before it could > be sent. (kafka.server.BrokerLifecycleManager) > [2024-06-03 15:12:34,297] INFO [BrokerLifecycleManager id=12 isZkBroker=true] > Unable to register the broker because the RPC got timed out before it could > be sent. (kafka.server.BrokerLifecycleManager) > [2024-06-03 15:13:04,536] INFO [BrokerLifecycleManager id=12 isZkBroker=true] > Unable to register the broker because the RPC got timed out before it could > be sent. (kafka.server.BrokerLifecycleManager){code} > The config on the controller node is the following > {code:java} > kafka0202e1 ~]$ sudo grep -v '^\s*$\|^\s*\#' /etc/kafka/server.properties | > grep -v password | sort > advertised.host.name=kafka0202e1.ahub.sb.eu.ginfra.net > broker.rack=e1 > controller.listener.names=CONTROLLER > controller.quorum.voters=2...@kafka0202e1.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e2.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e3.ahub.sb.eu.ginfra.net:9093 > default.replication.factor=3 > delete.topic.enable=false > group.initial.rebalance.delay.ms=3000 > inter.broker.protocol.version=3.7 > listeners=CONTROLLER://kafka0202e1.ahub.sb.eu.ginfra.net:9093 > listener.security.protocol.map=CONTROLLER:SSL,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL > log.dirs=/data/kafka > log.message.format.version=3.6 > log.retention.check.interval.ms=30 > log.retention.hours=240 > log.segment.bytes=1073741824 > min.insync.replicas=2 > node.id=20 > num.io.threads=8 > num.network.threads=3 > num.partitions=1 > num.recovery.threads.per.data.dir=1 > offsets.topic.replication.factor=3 > process.roles=controller > security.inter.broker.protocol=SSL > socket.receive.buffer.bytes=102400 > socket.request.max.bytes=104857600 > socket.send.buffer.bytes=102400 > ssl.cipher.suites=TLS_AES_256_GCM_SHA384 > ssl.client.auth=required > ssl.enabled.protocols=TLSv1.3 > ssl.endpoint.identification.algorithm=HTTPS > ssl.keystore.location=/etc/kafka/ssl/keystore.ts > ssl.keystore.type=JKS > ssl.secure.random.implementation=SHA1PRNG > ssl.truststore.location=/etc/kafka/ssl/truststore.ts > transaction.state.log.min.isr=3 > transaction.state.log.replication.factor=3 > unclean.leader.election.enable=false > zookeeper.connect=10.135.65.199:2181,10.133.65.199:2181,10.137.64.56:2181, > zookeeper.metadata.migration.enable=true > {code} > The config on the broker node is the following > {code} > $ sudo grep -v '^\s*$\|^\s*\#' /etc/kafka/server.properties | grep -v > password | sort > advertised.host.name=kafka0201e3.ahub.sb.eu.ginfra.net > advertised.listeners=SSL://kafka0201e3.ahub.sb.eu.ginfra.net:9092 > broker.id=12 > broker.rack=e3 > controller.listener.names=CONTROLLER # added once all controllers were started > controller.quorum.voters=2...@kafka0202e1.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e2.ahub.sb.eu.ginfra.net:9093,2...@kafka0202e3.ahub.sb.eu.ginfra.net:9093 > # added once
[jira] [Commented] (KAFKA-17033) Consider replacing the type of the ReplicaKey directory id field.
[ https://issues.apache.org/jira/browse/KAFKA-17033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869192#comment-17869192 ] José Armando García Sancio commented on KAFKA-17033: [~frankvicky] This Jira is targeting ReplicaKey. We can extend the scope of this Jira and PR to MetaProperties if we find it useful there. But I am not sure if that is the case. > Consider replacing the type of the ReplicaKey directory id field. > - > > Key: KAFKA-17033 > URL: https://issues.apache.org/jira/browse/KAFKA-17033 > Project: Kafka > Issue Type: Sub-task >Reporter: José Armando García Sancio >Priority: Major > > The current type for directoryId field in ReplicaKey is Optional. The > field is Optional.empty when the directory id is Uuuid.ZERO_UUID. This nice > because if forces KRaft to handle the empty care differently. > The issue with this modeling is that anytime KRaft needs to serialize the > directory it does an explicit conversion to the zero uuid when the option is > empty. > It is possible that adding a type like DirectoryId could improve the user > experience when dealing with the directory uuid. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17033) Consider replacing the type of the ReplicaKey directory id field.
[ https://issues.apache.org/jira/browse/KAFKA-17033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-17033: --- Description: The current type for directoryId field in ReplicaKey is Optional. The field is Optional.empty when the directory id is Uuuid.ZERO_UUID. This nice because if forces KRaft to handle the empty care differently. The issue with this modeling is that anytime KRaft needs to serialize the directory it does an explicit conversion to the zero uuid when the option is empty. It is possible that adding a type like DirectoryId could improve the user experience when dealing with the directory uuid. was:One way to handle this is to introduce a type called `DirectoryId` that just encapsulates a Uuid but it is able to better handle the Uuid.ZERO_UUID case. > Consider replacing the type of the ReplicaKey directory id field. > - > > Key: KAFKA-17033 > URL: https://issues.apache.org/jira/browse/KAFKA-17033 > Project: Kafka > Issue Type: Sub-task >Reporter: José Armando García Sancio >Priority: Major > > The current type for directoryId field in ReplicaKey is Optional. The > field is Optional.empty when the directory id is Uuuid.ZERO_UUID. This nice > because if forces KRaft to handle the empty care differently. > The issue with this modeling is that anytime KRaft needs to serialize the > directory it does an explicit conversion to the zero uuid when the option is > empty. > It is possible that adding a type like DirectoryId could improve the user > experience when dealing with the directory uuid. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17033) Consider replacing the type of the ReplicaKey directory id field.
[ https://issues.apache.org/jira/browse/KAFKA-17033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-17033: --- Summary: Consider replacing the type of the ReplicaKey directory id field. (was: Consider replacing the ReplicaKey Optional with just Uuid) > Consider replacing the type of the ReplicaKey directory id field. > - > > Key: KAFKA-17033 > URL: https://issues.apache.org/jira/browse/KAFKA-17033 > Project: Kafka > Issue Type: Sub-task >Reporter: José Armando García Sancio >Priority: Major > > One way to handle this is to introduce a type called `DirectoryId` that just > encapsulates a Uuid but it is able to better handle the Uuid.ZERO_UUID case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17033) Consider replacing the ReplicaKey Optional with just Uuid
[ https://issues.apache.org/jira/browse/KAFKA-17033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio updated KAFKA-17033: --- Summary: Consider replacing the ReplicaKey Optional with just Uuid (was: Consider replacing the directory id Optional with just Uuid) > Consider replacing the ReplicaKey Optional with just Uuid > --- > > Key: KAFKA-17033 > URL: https://issues.apache.org/jira/browse/KAFKA-17033 > Project: Kafka > Issue Type: Sub-task >Reporter: José Armando García Sancio >Priority: Major > > One way to handle this is to introduce a type called `DirectoryId` that just > encapsulates a Uuid but it is able to better handle the Uuid.ZERO_UUID case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16915) Update leader change message
[ https://issues.apache.org/jira/browse/KAFKA-16915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] José Armando García Sancio resolved KAFKA-16915. Assignee: Alyssa Huang Resolution: Fixed > Update leader change message > > > Key: KAFKA-16915 > URL: https://issues.apache.org/jira/browse/KAFKA-16915 > Project: Kafka > Issue Type: Sub-task >Reporter: José Armando García Sancio >Assignee: Alyssa Huang >Priority: Major > Fix For: 3.9.0 > > > It would be good to include the schema changes as part of 3.9 event if they > are not populated. It would make it compatible with kraft.version 1. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16701) Some SocketServerTest buffered close tests flaky failing locally
[ https://issues.apache.org/jira/browse/KAFKA-16701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869167#comment-17869167 ] bboyleonp edited comment on KAFKA-16701 at 7/28/24 12:23 PM: - [~gharris1727] I have added custom logs to narrow down the issue and find the following information Log for test `closingChannelSendFailure` on JDK 17 ({color:#de350b}Fail{color}) {code:java} [2024-07-28 19:46:31,751] DEBUG Leon: before receiveRequest (kafka:700) [2024-07-28 19:46:31,751] DEBUG Leon: request obtained by callbackQueue is null (kafka.request.logger:471) [2024-07-28 19:46:31,751] DEBUG Leon: the connectionQuotas of inetAddress is 1 (kafka.network.SocketServerTest$TestableProcessor:62) [2024-07-28 19:46:32,053] DEBUG Leon: the connectionQuotas of inetAddress is 0 (kafka.network.SocketServerTest$TestableProcessor:62) [2024-07-28 19:46:32,053] ERROR Exception while processing disconnection of 127.0.0.1:51325-127.0.0.1:51327-0 (kafka.network.SocketServerTest$TestableProcessor:76) java.lang.IllegalArgumentException: Attempted to decrease connection count for address with no connections, address: /127.0.0.1 at kafka.network.ConnectionQuotas.$anonfun$dec$1(SocketServer.scala:1535) at scala.collection.mutable.HashMap.getOrElse(HashMap.scala:451) at kafka.network.ConnectionQuotas.dec(SocketServer.scala:1535) at kafka.network.Processor.$anonfun$processDisconnected$1(SocketServer.scala:1225) at java.base/java.util.HashMap$KeySet.forEach(HashMap.java:1008) at kafka.network.Processor.processDisconnected(SocketServer.scala:1216) at kafka.network.Processor.run(SocketServer.scala:1019) at java.base/java.lang.Thread.run(Thread.java:840) [2024-07-28 19:46:41,755] DEBUG Leon: request obtained by requestQueue is null (kafka.request.logger:476) [2024-07-28 19:46:41,755] DEBUG Leon: before finally::proxyServer.close (kafka:703) {code} Log for test `closingChannelSendFailure` on JDK 11 ({color:#00875a}Success{color}) {code:java} [2024-07-28 19:24:48,265] DEBUG Leon: before receiveRequest (kafka:700) [2024-07-28 19:24:48,265] DEBUG Leon: request obtained by callbackQueue is null (kafka.request.logger:471) [2024-07-28 19:24:48,265] DEBUG Completed request:{"isForwarded":false,"requestHeader":{"requestApiKey":0,"requestApiVersion":11,"correlationId":-1,"clientId":"","requestApiKeyName":"PRODUCE"},"request":{"transactionalId":null,"acks":0,"timeoutMs":1,"topicData":[]},"response":"","connection":"127.0.0.1:50229-127.0.0.1:50231-0","totalTimeMs":104.595,"requestQueueTimeMs":0.0,"localTimeMs":3.2080946725E7,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.316,"sendTimeMs":0.03,"securityProtocol":"SSL","principal":"User:ANONYMOUS","listener":"SSL","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}} (kafka.request.logger:279) [2024-07-28 19:24:48,265] TRACE Socket server received empty response to send, registering for read: Response(type=NoOp, request=Request(processor=0, connectionId=127.0.0.1:50229-127.0.0.1:50231-0, session=org.apache.kafka.network.Session@242ff747, listenerName=ListenerName(SSL), securityProtocol=SSL, buffer=java.nio.HeapByteBuffer[pos=20 lim=20 cap=20], envelope=None)) (kafka.network.SocketServerTest$TestableProcessor:54) [2024-07-28 19:24:48,266] TRACE Processor 0 received request: RequestHeader(apiKey=PRODUCE, apiVersion=11, clientId=, correlationId=-1, headerVersion=2) -- {acks=0,timeout=1,partitionSizes=[]} (kafka.network.RequestChannel$:45) [2024-07-28 19:24:48,266] DEBUG Leon: request obtained by requestQueue is Request(processor=0, connectionId=127.0.0.1:50229-127.0.0.1:50231-0, session=org.apache.kafka.network.Session@5329f6b3, listenerName=ListenerName(SSL), securityProtocol=SSL, buffer=java.nio.HeapByteBuffer[pos=20 lim=20 cap=20], envelope=None) (kafka.request.logger:476) [2024-07-28 19:24:48,267] DEBUG Leon: before finally::proxyServer.close (kafka:703) [2024-07-28 19:24:48,267] DEBUG Leon: before sslConnect (kafka:1469) [2024-07-28 19:24:48,268] DEBUG Leon: before sendRequest (kafka:1471) {code} This test will call `receiveRequest` in _RequestChannel.scala_ to poll for `callbackQueue` and `requestQueue`. I found a daemon in _SocketServer.scala_ that will monitor and count the valid connections which is maintained by `connectionQuotas`. It seems that the connections in JDK 17 are disconnected unexpectedly. *Please find the 2 lines marked in red.* *Could you help to verify if you can find the same behav
[jira] [Commented] (KAFKA-16701) Some SocketServerTest buffered close tests flaky failing locally
[ https://issues.apache.org/jira/browse/KAFKA-16701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869167#comment-17869167 ] bboyleonp commented on KAFKA-16701: --- [~gharris1727] I have added custom logs to narrow down the issue and find the following information Log for test `closingChannelSendFailure` on JDK 17 ({color:#de350b}Fail{color}) {code:java} [2024-07-28 19:46:31,751] DEBUG Leon: before receiveRequest (kafka:700) [2024-07-28 19:46:31,751] DEBUG Leon: request obtained by callbackQueue is null (kafka.request.logger:471) [2024-07-28 19:46:31,751] DEBUG Leon: the connectionQuotas of inetAddress is 1 (kafka.network.SocketServerTest$TestableProcessor:62) [2024-07-28 19:46:32,053] DEBUG Leon: the connectionQuotas of inetAddress is 0 (kafka.network.SocketServerTest$TestableProcessor:62) [2024-07-28 19:46:32,053] ERROR Exception while processing disconnection of 127.0.0.1:51325-127.0.0.1:51327-0 (kafka.network.SocketServerTest$TestableProcessor:76) java.lang.IllegalArgumentException: Attempted to decrease connection count for address with no connections, address: /127.0.0.1 at kafka.network.ConnectionQuotas.$anonfun$dec$1(SocketServer.scala:1535) at scala.collection.mutable.HashMap.getOrElse(HashMap.scala:451) at kafka.network.ConnectionQuotas.dec(SocketServer.scala:1535) at kafka.network.Processor.$anonfun$processDisconnected$1(SocketServer.scala:1225) at java.base/java.util.HashMap$KeySet.forEach(HashMap.java:1008) at kafka.network.Processor.processDisconnected(SocketServer.scala:1216) at kafka.network.Processor.run(SocketServer.scala:1019) at java.base/java.lang.Thread.run(Thread.java:840) [2024-07-28 19:46:41,755] DEBUG Leon: request obtained by requestQueue is null (kafka.request.logger:476) [2024-07-28 19:46:41,755] DEBUG Leon: before finally::proxyServer.close (kafka:703) {code} Log for test `closingChannelSendFailure` on JDK 11 ({color:#00875a}Success{color}) {code:java} [2024-07-28 19:24:48,265] DEBUG Leon: before receiveRequest (kafka:700) [2024-07-28 19:24:48,265] DEBUG Leon: request obtained by callbackQueue is null (kafka.request.logger:471) [2024-07-28 19:24:48,265] DEBUG Completed request:{"isForwarded":false,"requestHeader":{"requestApiKey":0,"requestApiVersion":11,"correlationId":-1,"clientId":"","requestApiKeyName":"PRODUCE"},"request":{"transactionalId":null,"acks":0,"timeoutMs":1,"topicData":[]},"response":"","connection":"127.0.0.1:50229-127.0.0.1:50231-0","totalTimeMs":104.595,"requestQueueTimeMs":0.0,"localTimeMs":3.2080946725E7,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.316,"sendTimeMs":0.03,"securityProtocol":"SSL","principal":"User:ANONYMOUS","listener":"SSL","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}} (kafka.request.logger:279) [2024-07-28 19:24:48,265] TRACE Socket server received empty response to send, registering for read: Response(type=NoOp, request=Request(processor=0, connectionId=127.0.0.1:50229-127.0.0.1:50231-0, session=org.apache.kafka.network.Session@242ff747, listenerName=ListenerName(SSL), securityProtocol=SSL, buffer=java.nio.HeapByteBuffer[pos=20 lim=20 cap=20], envelope=None)) (kafka.network.SocketServerTest$TestableProcessor:54) [2024-07-28 19:24:48,266] TRACE Processor 0 received request: RequestHeader(apiKey=PRODUCE, apiVersion=11, clientId=, correlationId=-1, headerVersion=2) -- {acks=0,timeout=1,partitionSizes=[]} (kafka.network.RequestChannel$:45) [2024-07-28 19:24:48,266] DEBUG Leon: request obtained by requestQueue is Request(processor=0, connectionId=127.0.0.1:50229-127.0.0.1:50231-0, session=org.apache.kafka.network.Session@5329f6b3, listenerName=ListenerName(SSL), securityProtocol=SSL, buffer=java.nio.HeapByteBuffer[pos=20 lim=20 cap=20], envelope=None) (kafka.request.logger:476) [2024-07-28 19:24:48,267] DEBUG Leon: before finally::proxyServer.close (kafka:703) [2024-07-28 19:24:48,267] DEBUG Leon: before sslConnect (kafka:1469) [2024-07-28 19:24:48,268] DEBUG Leon: before sendRequest (kafka:1471) {code} This test will call `receiveRequest` in _RequestChannel.scala_ to poll for `callbackQueue` and `requestQueue`. I found a daemon in _SocketServer.scala_ that will monitor and count the valid connections which is maintained by `connectionQuotas`. It seems that the connections in JDK 17 are disconnected unexpectedly. *Please find the 2 lines marked in red.* *Could you help to verify if you can find the same behavior on your environment as well?* Her
[jira] [Commented] (KAFKA-13906) Invalid replica state transition
[ https://issues.apache.org/jira/browse/KAFKA-13906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869164#comment-17869164 ] Gaurav Narula commented on KAFKA-13906: --- Hi [~johnnyhsu] I wanted to check if you're still working on this? Please feel free to assign it to me otherwise as I think I may have a workaround. > Invalid replica state transition > > > Key: KAFKA-13906 > URL: https://issues.apache.org/jira/browse/KAFKA-13906 > Project: Kafka > Issue Type: Bug > Components: controller, core, replication >Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.1.2, > 3.2.1 >Reporter: Igor Soarez >Assignee: Johnny Hsu >Priority: Major > Labels: BUG, controller, replication, reproducible-bug > > The controller runs into an IllegalStateException when reacting to changes in > broker membership status if there are topics that are pending deletion. > > How to reproduce: > # Setup cluster with 3 brokers > # Create a topic with a partition being led by each broker and produce some > data > # Kill one of the brokers that is not the controller, and keep that broker > down > # Delete the topic > # Restart the other broker that is not the controller > > Logs and stacktrace: > {{[2022-05-16 11:53:25,482] ERROR [Controller id=1 epoch=1] Controller 1 > epoch 1 initiated state change of replica 3 for partition test-topic-2 from > ReplicaDeletionSuccessful to ReplicaDeletionIneligible failed > (state.change.logger)}} > {{java.lang.IllegalStateException: Replica > [Topic=test-topic,Partition=2,Replica=3] should be in the > OfflineReplica,ReplicaDeletionStarted states before moving to > ReplicaDeletionIneligible state. Instead it is in ReplicaDeletionSuccessful > state}} > {{ at > kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}} > {{ at scala.collection.immutable.List.foreach(List.scala:333)}} > {{ at > kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:112)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:111)}} > {{ at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)}} > {{ at > scala.collection.immutable.HashMap.foreachEntry(HashMap.scala:1092)}} > {{ at > kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:111)}} > {{ at > kafka.controller.TopicDeletionManager.failReplicaDeletion(TopicDeletionManager.scala:157)}} > {{ at > kafka.controller.KafkaController.onReplicasBecomeOffline(KafkaController.scala:638)}} > {{ at > kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:599)}} > {{ at > kafka.controller.KafkaController.processBrokerChange(KafkaController.scala:1623)}} > {{ at > kafka.controller.KafkaController.process(KafkaController.scala:2534)}} > {{ at > kafka.controller.QueuedEvent.process(ControllerEventManager.scala:52)}} > {{ at > kafka.controller.ControllerEventManager$ControllerEventThread.process$1(ControllerEventManager.scala:130)}} > {{--}} > {{[2022-05-16 11:53:40,726] ERROR [Controller id=1 epoch=1] Controller 1 > epoch 1 initiated state change of replica 3 for partition test-topic-2 from > ReplicaDeletionSuccessful to OnlineReplica failed (state.change.logger)}} > {{java.lang.IllegalStateException: Replica > [Topic=test-topic,Partition=2,Replica=3] should be in the > NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible states > before moving to OnlineReplica state. Instead it is in > ReplicaDeletionSuccessful state}} > {{ at > kafka.controller.ZkReplicaStateMachine.logInvalidTransition(ReplicaStateMachine.scala:442)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2(ReplicaStateMachine.scala:164)}} > {{ at > kafka.controller.ZkReplicaStateMachine.$anonfun$doHandleStateChanges$2$adapted(ReplicaStateMachine.scala:164)}} > {{ at scala.collection.immutable.List.foreach(List.scala:333)}} > {{ at > kafka
[jira] [Commented] (KAFKA-17206) Use v1 of LeaderChangeMessage when kraft.version is 1
[ https://issues.apache.org/jira/browse/KAFKA-17206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869152#comment-17869152 ] Meisam Jafari commented on KAFKA-17206: --- Hi Alyssa Huang, Could I take this issue?? > Use v1 of LeaderChangeMessage when kraft.version is 1 > - > > Key: KAFKA-17206 > URL: https://issues.apache.org/jira/browse/KAFKA-17206 > Project: Kafka > Issue Type: Task >Reporter: Alyssa Huang >Priority: Minor > > [https://github.com/apache/kafka/pull/16668] introduced v1 of LCM but still > uses v0 of the schema. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16701) Some SocketServerTest buffered close tests flaky failing locally
[ https://issues.apache.org/jira/browse/KAFKA-16701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869149#comment-17869149 ] bboyleonp commented on KAFKA-16701: --- [~gharris1727] I agree that we should keep investigating this issue. It seems the inter-test interaction that causes the case `closingChannelSendFailure` execution in between `processDisconnectedException` i{*}s not reproducible{*} on my environment anymore, which aligns our previous thoughts. I am going to ignore the previous finding on this one and try to figure out if there's some other potential flaw. > It looks like the full suite almost always fails, and tests on their own > almost always pass I can find the same. It seems the problem does not lie in the test cases themselves, but the whole suite execution. I am going to investigate towards this direction to see if I can find any clues. > Opening and closing sockets will inherently change the state of the process, > and so its always possible that sockets from one test are being picked up by > another. That is a great point. I print out the PID and TID of `processDisconnectedException` and `closingChannelSendFailure` for testing purpose and they are both using the same PID and TID. There's a chance that this is raised by resources reuse. I will update if I have any new findings. > Some SocketServerTest buffered close tests flaky failing locally > > > Key: KAFKA-16701 > URL: https://issues.apache.org/jira/browse/KAFKA-16701 > Project: Kafka > Issue Type: Test > Components: core, unit tests >Affects Versions: 3.5.0, 3.6.0, 3.7.0 >Reporter: Greg Harris >Assignee: bboyleonp >Priority: Major > Labels: flaky-test > > These tests are failing for me on a local development environment, but don't > appear to be flaky or failing in CI. They only appear to fail for JDK >= 17. > I'm using an M1 Mac, so it is possible that either the Mac's linear port > allocation, or a native implementation is impacting this. > closingChannelSendFailure() > > {noformat} > java.lang.AssertionError: receiveRequest timed out > at > kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) > at > kafka.network.SocketServerTest.makeChannelWithBufferedRequestsAndCloseRemote(SocketServerTest.scala:690) > at > kafka.network.SocketServerTest.$anonfun$verifySendFailureAfterRemoteClose$1(SocketServerTest.scala:1434) > at > kafka.network.SocketServerTest.verifySendFailureAfterRemoteClose(SocketServerTest.scala:1430) > at > kafka.network.SocketServerTest.closingChannelSendFailure(SocketServerTest.scala:1425){noformat} > closingChannelWithBufferedReceivesFailedSend() > > {noformat} > java.lang.AssertionError: receiveRequest timed out > at > kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590) > at > kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553) > at > kafka.network.SocketServerTest.closingChannelWithBufferedReceivesFailedSend(SocketServerTest.scala:1520){noformat} > closingChannelWithCompleteAndIncompleteBufferedReceives() > {noformat} > java.lang.AssertionError: receiveRequest timed out > at > kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(SocketServerTest.scala:1591) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$1(SocketServerTest.scala:1590) > at > kafka.network.SocketServerTest.verifyRemoteCloseWithBufferedReceives(SocketServerTest.scala:1553) > at > kafka.network.SocketServerTest.closingChannelWithCompleteAndIncompleteBufferedReceives(SocketServerTest.scala:1511) > {noformat} > remoteCloseWithBufferedReceives() > {noformat} > java.lang.AssertionError: receiveRequest timed out > at > kafka.network.SocketServerTest.receiveRequest(SocketServerTest.scala:148) > at > kafka.network.SocketServerTest.$anonfun$verifyRemoteCloseWithBufferedReceives$6(S
[jira] [Updated] (KAFKA-17209) Revisit testCurrentLag for AsyncKafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-17209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] TaiJuWu updated KAFKA-17209: External issue URL: https://github.com/apache/kafka/pull/16703 > Revisit testCurrentLag for AsyncKafkaConsumer > - > > Key: KAFKA-17209 > URL: https://issues.apache.org/jira/browse/KAFKA-17209 > Project: Kafka > Issue Type: Test >Reporter: TaiJuWu >Assignee: TaiJuWu >Priority: Major > > The difference between {{ClassicConsumer}} and {{AsyncConsumer}} is > ClassicConsumer is single thread and AsyncConsumer is two thread, one is app > thread, the other is background thread. > {{Fetch}} Request is generated by background thread and {{ListOffset}} > Request is generated by application thread. > This will make {{testCurrentLag}} fail if AsyncConsumer generated {{Fetch > Request}} first. > So we support out of order feature for {{MockClient}} to resolve issue. > Another difference between asyncConsumer and ClassicConsumer is front do > generate additional {{FetchOffset}} Request from CommitRequestManager but > ClassicConsumer does not have the request. > The reason is coordinator is not ready during test for {{ClassicConsumer}} so > we can ignore this request for {{AsyncConsumer.}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17209) Revisit testCurrentLag for AsyncKafkaConsumer
TaiJuWu created KAFKA-17209: --- Summary: Revisit testCurrentLag for AsyncKafkaConsumer Key: KAFKA-17209 URL: https://issues.apache.org/jira/browse/KAFKA-17209 Project: Kafka Issue Type: Test Reporter: TaiJuWu Assignee: TaiJuWu The difference between {{ClassicConsumer}} and {{AsyncConsumer}} is ClassicConsumer is single thread and AsyncConsumer is two thread, one is app thread, the other is background thread. {{Fetch}} Request is generated by background thread and {{ListOffset}} Request is generated by application thread. This will make {{testCurrentLag}} fail if AsyncConsumer generated {{Fetch Request}} first. So we support out of order feature for {{MockClient}} to resolve issue. Another difference between asyncConsumer and ClassicConsumer is front do generate additional {{FetchOffset}} Request from CommitRequestManager but ClassicConsumer does not have the request. The reason is coordinator is not ready during test for {{ClassicConsumer}} so we can ignore this request for {{AsyncConsumer.}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-8366) partitions of topics being deleted show up in the offline partitions metric
[ https://issues.apache.org/jira/browse/KAFKA-8366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gaurav Narula reassigned KAFKA-8366: Assignee: Gaurav Narula > partitions of topics being deleted show up in the offline partitions metric > --- > > Key: KAFKA-8366 > URL: https://issues.apache.org/jira/browse/KAFKA-8366 > Project: Kafka > Issue Type: Bug > Components: controller, core, metrics >Affects Versions: 3.1.0, 3.0.0, 3.0.1, 3.2.0, 3.1.1, 3.3.0, 3.0.2, 3.2.1 >Reporter: Radai Rosenblatt >Assignee: Gaurav Narula >Priority: Major > Labels: BUG, controller, metrics, reproducible-bug > > i believe this is a bug > offline partitions is a metric that indicates an error condition - lack of > kafka availability. > as an artifact of how deletion is implemented the partitions for a topic > undergoing deletion will show up as offline, which just creates > false-positive alerts. > if needed, maybe there should exist a separate "partitions to be deleted" > sensor. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-17186) Cannot receive message after stopping Source Mirror Maker 2
[ https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869062#comment-17869062 ] George Yang edited comment on KAFKA-17186 at 7/27/24 2:45 AM: -- Great! That solution works well. How did you determine that 'distributed' should be removed as a prefix? From what I understand, we typically set parameters like 'producer', 'consumer', and 'admin' in the following format: * {{{source}.consumer.\{consumer_config_name}}} * {{{target}.producer.\{producer_config_name}}} * {{{source_or_target}.admin.\{admin_config_name}}} These conventions are recommended by the Kafka official documentation. However, for custom Kafka Connect settings such as DistributedConfig, JsonConverterConfig, ConnectorConfig, SourceConnectorConfig, EnrichedConnectorConfig, MirrorCheckpointConfig, MirrorHeartbeatConfig, MirrorCheckpointTaskConfig, and others, can I simply omit '\{someConfig}' and set them using '\{alias}.\{some_config_name}'? was (Author: JIRAUSER302264): Great! That solution works well. How did you determine that 'distributed' should be removed as a prefix? From what I understand, we typically set parameters like 'producer', 'consumer', and 'admin' in the following format: * {{{source}.consumer.\{consumer_config_name}}} * {{{target}.producer.\{producer_config_name}}} * {{{source_or_target}.admin.\{admin_config_name}}} These conventions are recommended by the Kafka official documentation. However, for custom Kafka Connect settings such as DistributedConfig, JsonConverterConfig, ConnectorConfig, SourceConnectorConfig, EnrichedConnectorConfig, MirrorCheckpointConfig, MirrorHeartbeatConfig, MirrorCheckpointTaskConfig, and others, can I simply omit '\{someConfig}' and set them using '\{alias}.\{some_config_name}'? > Cannot receive message after stopping Source Mirror Maker 2 > --- > > Key: KAFKA-17186 > URL: https://issues.apache.org/jira/browse/KAFKA-17186 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.1 > Environment: Source Kafka Cluster per Node: > CPU(s): 32 > Memory: 32G/1.1G free > Target Kafka Cluster standalone Node: > CPU(s): 24 > Memory: 30G/909M free > Kafka Version 3.7 > Mirrormaker Version 3.7.1 >Reporter: George Yang >Priority: Major > Attachments: image-2024-07-25-14-14-21-327.png, mirrorMaker.out > > > Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node > 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. > Currently, a service on node 1 in Data Center A acts as a producer sending > messages to the `myTest` topic. A service in Data Center B acts as a consumer > listening to `A.myTest`. > The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer > in Data Center B ceases to receive messages. Even after I restarting MM2 in > Data Center A, the consumer in Data Center B still does not receive messages > until approximately 5 minutes later when a rebalance occurs, at which point > it begins receiving messages again. > > [Logs From Consumer on Data Center B] > [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > (Re-)joining group > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) > [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully joined group with generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully synced group in generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sess
[jira] [Commented] (KAFKA-17186) Cannot receive message after stopping Source Mirror Maker 2
[ https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869062#comment-17869062 ] George Yang commented on KAFKA-17186: - Great! That solution works well. How did you determine that 'distributed' should be removed as a prefix? From what I understand, we typically set parameters like 'producer', 'consumer', and 'admin' in the following format: * {{{source}.consumer.\{consumer_config_name}}} * {{{target}.producer.\{producer_config_name}}} * {{{source_or_target}.admin.\{admin_config_name}}} These conventions are recommended by the Kafka official documentation. However, for custom Kafka Connect settings such as DistributedConfig, JsonConverterConfig, ConnectorConfig, SourceConnectorConfig, EnrichedConnectorConfig, MirrorCheckpointConfig, MirrorHeartbeatConfig, MirrorCheckpointTaskConfig, and others, can I simply omit '\{someConfig}' and set them using '\{alias}.\{some_config_name}'? > Cannot receive message after stopping Source Mirror Maker 2 > --- > > Key: KAFKA-17186 > URL: https://issues.apache.org/jira/browse/KAFKA-17186 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.1 > Environment: Source Kafka Cluster per Node: > CPU(s): 32 > Memory: 32G/1.1G free > Target Kafka Cluster standalone Node: > CPU(s): 24 > Memory: 30G/909M free > Kafka Version 3.7 > Mirrormaker Version 3.7.1 >Reporter: George Yang >Priority: Major > Attachments: image-2024-07-25-14-14-21-327.png, mirrorMaker.out > > > Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node > 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. > Currently, a service on node 1 in Data Center A acts as a producer sending > messages to the `myTest` topic. A service in Data Center B acts as a consumer > listening to `A.myTest`. > The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer > in Data Center B ceases to receive messages. Even after I restarting MM2 in > Data Center A, the consumer in Data Center B still does not receive messages > until approximately 5 minutes later when a rebalance occurs, at which point > it begins receiving messages again. > > [Logs From Consumer on Data Center B] > [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > (Re-)joining group > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) > [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully joined group with generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully synced group in generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined > group at generation 52 with protocol version 2 and got assignment: > Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', > leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], > taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, > MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], > delay=0} with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting > connectors and tasks using config offset 1360 > (org.apache.kafka.connect.runtime.distributed.Distribu
[jira] [Commented] (KAFKA-17207) ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking clients
[ https://issues.apache.org/jira/browse/KAFKA-17207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17869050#comment-17869050 ] Xuze Yang commented on KAFKA-17207: --- Hi [~gharris1727], if you're not working on this, may I take it? Thank you. > ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking > clients > - > > Key: KAFKA-17207 > URL: https://issues.apache.org/jira/browse/KAFKA-17207 > Project: Kafka > Issue Type: Test > Components: connect >Affects Versions: 3.9.0 >Reporter: Greg Harris >Priority: Minor > Labels: newbie > > The testRequestTimeouts deletes the internal config topic, putting the > connect workers into a bad state. When the test goes to clean up, it calls > DistributedHerder#stop, which waits for the herder executor to stop. This > times out, because the herder executor is blocked closing the > KafkaConfigBackingStore's producer. This log message gets printed: > {noformat} > [2024-07-26 11:52:50,817] ERROR Executor > java.util.concurrent.ThreadPoolExecutor@7ae97a58[Shutting down, pool size = > 1, active threads = 1, queued tasks = 0, completed tasks = 0] did not > terminate in time (org.apache.kafka.common.utils.ThreadUtils:83){noformat} > This effectively leaks the Kafka clients for the workers' internal topics, > and the herder executor thread. Instead, either the producer should not block > indefinitely on a missing topic, or the cluster state should be healed enough > for the producer to shutdown cleanly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14830) Illegal state error in transactional producer
[ https://issues.apache.org/jira/browse/KAFKA-14830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14830: -- Priority: Critical (was: Major) > Illegal state error in transactional producer > - > > Key: KAFKA-14830 > URL: https://issues.apache.org/jira/browse/KAFKA-14830 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.1.2 >Reporter: Jason Gustafson >Assignee: Kirk True >Priority: Critical > > We have seen the following illegal state error in the producer: > {code:java} > [Producer clientId=client-id2, transactionalId=transactional-id] Transiting > to abortable error state due to > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > topic-0:120027 ms has passed since batch creation > [Producer clientId=client-id2, transactionalId=transactional-id] Transiting > to abortable error state due to > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > topic-1:120026 ms has passed since batch creation > [Producer clientId=client-id2, transactionalId=transactional-id] Aborting > incomplete transaction > [Producer clientId=client-id2, transactionalId=transactional-id] Invoking > InitProducerId with current producer ID and epoch > ProducerIdAndEpoch(producerId=191799, epoch=0) in order to bump the epoch > [Producer clientId=client-id2, transactionalId=transactional-id] ProducerId > set to 191799 with epoch 1 > [Producer clientId=client-id2, transactionalId=transactional-id] Transiting > to abortable error state due to > org.apache.kafka.common.errors.NetworkException: Disconnected from node 4 > [Producer clientId=client-id2, transactionalId=transactional-id] Transiting > to abortable error state due to > org.apache.kafka.common.errors.TimeoutException: The request timed out. > [Producer clientId=client-id2, transactionalId=transactional-id] Uncaught > error in request completion: > java.lang.IllegalStateException: TransactionalId transactional-id: Invalid > transition attempted from state READY to state ABORTABLE_ERROR > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:1089) > at > org.apache.kafka.clients.producer.internals.TransactionManager.transitionToAbortableError(TransactionManager.java:508) > at > org.apache.kafka.clients.producer.internals.TransactionManager.maybeTransitionToErrorState(TransactionManager.java:734) > at > org.apache.kafka.clients.producer.internals.TransactionManager.handleFailedBatch(TransactionManager.java:739) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:753) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:743) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:695) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:634) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$null$1(Sender.java:575) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1541) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$handleProduceResponse$2(Sender.java:562) > at java.base/java.lang.Iterable.forEach(Iterable.java:75) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:562) > at > org.apache.kafka.clients.producer.internals.Sender.lambda$sendProduceRequest$5(Sender.java:836) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:583) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:575) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:328) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) > at java.base/java.lang.Thread.run(Thread.java:829) > {code} > The producer hits timeouts which cause it to abort an active transaction. > After aborting, the producer bumps its epoch, which transitions it back to > the `READY` state. Following this, there are two errors for inflight > requests, which cause an illegal state transition to `ABORTABLE_ERROR`. But > how could the transaction ABORT complete if there were still inflight > requests? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15402) Performance regression on close consumer after upgrading to 3.5.0
[ https://issues.apache.org/jira/browse/KAFKA-15402?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-15402: -- Priority: Critical (was: Major) > Performance regression on close consumer after upgrading to 3.5.0 > - > > Key: KAFKA-15402 > URL: https://issues.apache.org/jira/browse/KAFKA-15402 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.5.0, 3.6.0, 3.5.1 >Reporter: Benoit Delbosc >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor > Fix For: 3.9.0 > > Attachments: image-2023-08-24-18-51-21-720.png, > image-2023-08-24-18-51-57-435.png, image-2023-08-25-10-50-28-079.png > > > Hi, > After upgrading to Kafka client version 3.5.0, we have observed a significant > increase in the duration of our Java unit tests. These unit tests heavily > rely on the Kafka Admin, Producer, and Consumer API. > When using Kafka server version 3.4.1, the duration of the unit tests > increased from 8 seconds (with Kafka client 3.4.1) to 18 seconds (with Kafka > client 3.5.0). > Upgrading the Kafka server to 3.5.1 show similar results. > I have come across the issue KAFKA-15178, which could be the culprit. I will > attempt to test the proposed patch. > In the meantime, if you have any ideas that could help identify and address > the regression, please let me know. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16444) Run KIP-848 unit tests under code coverage
[ https://issues.apache.org/jira/browse/KAFKA-16444?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16444: -- Priority: Minor (was: Major) > Run KIP-848 unit tests under code coverage > -- > > Key: KAFKA-16444 > URL: https://issues.apache.org/jira/browse/KAFKA-16444 > Project: Kafka > Issue Type: Task > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.9.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17182) Consumer's fetch sessions are evicted too quickly
[ https://issues.apache.org/jira/browse/KAFKA-17182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17182: -- Priority: Critical (was: Major) > Consumer's fetch sessions are evicted too quickly > - > > Key: KAFKA-17182 > URL: https://issues.apache.org/jira/browse/KAFKA-17182 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor > Fix For: 4.0.0 > > > In stress testing the new consumer, the new consumer is evicting fetch > sessions on the broker much more frequently than expected. There is an > ongoing investigation into this behavior, but it appears to stem from a race > condition due to the design of the new consumer. > In the background thread, fetch requests are sent in a near continuous > fashion for partitions that are "fetchable." A timing bug appears to cause > partitions to be "unfetchable," which then causes them to end up in the > "removed" set of partitions. The broker then removes them from the fetch > session, which causes the number of remaining partitions for that session to > drop below a threshold that allows it to be evicted by another competing > session. Within a few milliseconds, though, the partitions become "fetchable" > again, and are added to the "added" set of partitions on the next fetch > request. This causes thrashing on both the client and broker sides as both > are handling a steady stream of evictions, which negatively affects > consumption throughput. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15320) Document event queueing patterns
[ https://issues.apache.org/jira/browse/KAFKA-15320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15320: - Assignee: (was: Kirk True) > Document event queueing patterns > > > Key: KAFKA-15320 > URL: https://issues.apache.org/jira/browse/KAFKA-15320 > Project: Kafka > Issue Type: Task > Components: clients, consumer, documentation >Reporter: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, events > Fix For: 3.9.0 > > > We need to first document the event enqueuing patterns in the > PrototypeAsyncConsumer. As part of this task, determine if it’s > necessary/beneficial to _conditionally_ add events and/or coalesce any > duplicate events in the queue. > _Don’t forget to include diagrams for clarity!_ > This should be documented on the AK wiki. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17183) New consumer system tests pass for subset of tests, but fail if running all tests
[ https://issues.apache.org/jira/browse/KAFKA-17183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-17183: -- Priority: Blocker (was: Major) > New consumer system tests pass for subset of tests, but fail if running all > tests > - > > Key: KAFKA-17183 > URL: https://issues.apache.org/jira/browse/KAFKA-17183 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Blocker > Labels: consumer-threading-refactor, system-tests > Fix For: 3.9.0 > > > In 3.8, many of the system tests were updated to exercise both the old > consumer and the new consumer. For quicker feedback, I have been creating a > YAML-based test suite for just the subset of tests that were updated. When > running the system tests in the subset, I am consistently seeing 100% pass > rate. Recently I started running the entire test suite, and are now seeing > many failures. > The cause is as yet unknown, but adding this here as a task to fix whatever > is causing this behavior. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15638) Investigate ConsumerNetworkThreadTest's testPollResultTimer
[ https://issues.apache.org/jira/browse/KAFKA-15638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15638: - Assignee: (was: Kirk True) > Investigate ConsumerNetworkThreadTest's testPollResultTimer > --- > > Key: KAFKA-15638 > URL: https://issues.apache.org/jira/browse/KAFKA-15638 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Kirk True >Priority: Minor > Labels: consumer-threading-refactor, timeout, unit-tests > Fix For: 3.9.0 > > > Regarding this comment in {{{}testPollResultTimer{}}}... > {code:java} > // purposely setting a non-MAX time to ensure it is returning Long.MAX_VALUE > upon success| > {code} > [~junrao] asked: > {quote}Which call is returning Long.MAX_VALUE? > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16290) Investigate propagating subscription state updates via queues
[ https://issues.apache.org/jira/browse/KAFKA-16290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16290: - Assignee: (was: Kirk True) > Investigate propagating subscription state updates via queues > - > > Key: KAFKA-16290 > URL: https://issues.apache.org/jira/browse/KAFKA-16290 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lucas Brutschy >Priority: Critical > Labels: consumer-threading-refactor, kip-848, > kip-848-client-support > Fix For: 3.9.0 > > > We are mostly using the queues for interaction between application thread and > background thread, but the subscription object is shared between the threads, > and it is updated directly without going through the queues. > The way we allow updates to the subscription state from both threads is > definitely not right, and will bring trouble. Places like the assign() is > probably the most obvious, where we send an event to the background to > commit, but then update the subscription in the foreground right away. > It seems sensible to aim for having all updates to the subscription state in > the background, triggered from the app thread via events (and I think we > already have related events for all updates, just that the subscription state > was left out in the app thread). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16109) Write system tests cover the "simple consumer + commit" use case
[ https://issues.apache.org/jira/browse/KAFKA-16109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16109: - Assignee: (was: Kirk True) > Write system tests cover the "simple consumer + commit" use case > > > Key: KAFKA-16109 > URL: https://issues.apache.org/jira/browse/KAFKA-16109 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer, system tests >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, system-tests > Fix For: 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16315) Investigate propagating metadata updates via queues
[ https://issues.apache.org/jira/browse/KAFKA-16315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16315: - Assignee: (was: Kirk True) > Investigate propagating metadata updates via queues > --- > > Key: KAFKA-16315 > URL: https://issues.apache.org/jira/browse/KAFKA-16315 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.9.0 > > > Some of the new {{AsyncKafkaConsumer}} logic enqueues events for the network > I/O thread then issues a call to update the {{ConsumerMetadata}} via > {{requestUpdate()}}. If the event ends up stuck in the queue for a long time, > it is possible that the metadata is not updated at the correct time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15173) Consumer event queues should be bounded
[ https://issues.apache.org/jira/browse/KAFKA-15173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-15173: - Assignee: (was: Kirk True) > Consumer event queues should be bounded > --- > > Key: KAFKA-15173 > URL: https://issues.apache.org/jira/browse/KAFKA-15173 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor, events > Fix For: 3.9.0 > > > The async consumer uses ApplicationEventQueue and BackgroundEventQueue to > facilitate message passing between the application thread and the background > thread. The current implementation is boundless, which can potentially cause > OOM and other performance-related issues. > I think the queues need a finite bound, and we need to decide how to handle > the situation when the bound is reached. In particular, I would like to > answer these questions: > > # What should the upper limit be for both queues: Can this be a > configurable, memory-based bound? Or just an arbitrary number of events as > the bound. > # What should happen when the application event queue is filled up? It > seems like we should introduce a new exception type and notify the user that > the consumer is full. > # What should happen when the background event queue is filled up? This > seems less likely to happen, but I imagine it could happen when the user > stops polling the consumer, causing the queue to be filled. > # Is it necessary to introduce a public configuration for the queue? I think > initially we would select an arbitrary constant number and see the community > feedback to make a forward plan accordingly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16642) Update KafkaConsumerTest to show parameters in test lists
[ https://issues.apache.org/jira/browse/KAFKA-16642?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16642: - Assignee: (was: Kirk True) > Update KafkaConsumerTest to show parameters in test lists > - > > Key: KAFKA-16642 > URL: https://issues.apache.org/jira/browse/KAFKA-16642 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.9.0 > > > {{KafkaConsumerTest}} was recently updated to make many of its tests > parameterized to exercise both the {{CLASSIC}} and {{CONSUMER}} group > protocols. However, in some of the tools in which [lists of tests are > provided|https://ge.apache.org/scans/tests?search.names=Git%20branch=P28D=kafka=America%2FLos_Angeles=trunk=org.apache.kafka.clients.consumer.KafkaConsumerTest=FLAKY], > say, for analysis, the group protocol information is not exposed. For > example, one test ({{{}testReturnRecordsDuringRebalance{}}}) is flaky, but > it's difficult to know at a glance which group protocol is causing the > problem because the list simply shows: > {quote}{{testReturnRecordsDuringRebalance(GroupProtocol)[1]}} > {quote} > Ideally, it would expose more information, such as: > {quote}{{testReturnRecordsDuringRebalance(GroupProtocol=CONSUMER)}} > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16966) Allow offset commit fetch to reuse previous request if partitions are a subset
[ https://issues.apache.org/jira/browse/KAFKA-16966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16966: - Assignee: (was: Kirk True) > Allow offset commit fetch to reuse previous request if partitions are a subset > -- > > Key: KAFKA-16966 > URL: https://issues.apache.org/jira/browse/KAFKA-16966 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 3.8.0 >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor > Fix For: 3.9.0 > > > In {{{}initWithCommittedOffsetsIfNeeded{}}}, the behavior of the existing > {{LegacyKafkaConsumer}} is to allow reuse only if the partitions for the > _current_ request equal those of the _previous_ request *exactly* > ([source|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java?rgh-link-date=2024-06-14T16%3A43%3A11Z#L927]). > That logic is the basis for the behavior used in the > {{{}AsyncKafkaConsumer{}}}. > The proposed change is to allow for request reuse if the partitions for the > _current_ request are a subset of those of the _previous_ request. This > introduces a subtle difference in behavior between the two {{Consumer}} > implementations, so we need to decided if we want to change both > implementations or just {{{}AsyncKafkaConsumer{}}}. Also, the specific case > that the request reuse logic solves is when the user has passed in a very low > timeout value in a tight {{poll()}} loop, which suggests the partitions > wouldn't be changing between those loops. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17208) replica_scale_test.py fails for new consumer
Kirk True created KAFKA-17208: - Summary: replica_scale_test.py fails for new consumer Key: KAFKA-17208 URL: https://issues.apache.org/jira/browse/KAFKA-17208 Project: Kafka Issue Type: Bug Components: clients, consumer, system tests Affects Versions: 3.8.0 Reporter: Kirk True Assignee: Kirk True Fix For: 4.0.0 {{replica_scale_test}}’s {{test_produce_consume}} fails when using a {{group_protocol}} of {{CONSUMER}}: {noformat} TimeoutError('replicas-consume-workload failed to finish in the expected amount of time.') Traceback (most recent call last): File "/home/kafka/venv/lib/python3.8/site-packages/ducktape-0.11.1-py3.8.egg/ducktape/tests/runner_client.py", line 183, in _do_run data = self.run_test() File "/home/kafka/venv/lib/python3.8/site-packages/ducktape-0.11.1-py3.8.egg/ducktape/tests/runner_client.py", line 243, in run_test return self.test_context.function(self.test) File "/home/kafka/venv/lib/python3.8/site-packages/ducktape-0.11.1-py3.8.egg/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/home/kafka/tests/kafkatest/tests/core/replica_scale_test.py", line 116, in test_produce_consume consume_workload.wait_for_done(timeout_sec=600) File "/home/kafka/tests/kafkatest/services/trogdor/trogdor.py", line 352, in wait_for_done wait_until(lambda: self.done(), File "/home/kafka/venv/lib/python3.8/site-packages/ducktape-0.11.1-py3.8.egg/ducktape/utils/util.py", line 58, in wait_until raise TimeoutError(err_msg() if callable(err_msg) else err_msg) from last_exception ducktape.errors.TimeoutError: replicas-consume-workload failed to finish in the expected amount of time. {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17202) EosIntegrationTest.verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset leaks consumers
[ https://issues.apache.org/jira/browse/KAFKA-17202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-17202: Priority: Minor (was: Major) > EosIntegrationTest.verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset > leaks consumers > -- > > Key: KAFKA-17202 > URL: https://issues.apache.org/jira/browse/KAFKA-17202 > Project: Kafka > Issue Type: Test > Components: streams >Affects Versions: 3.9.0 >Reporter: Greg Harris >Assignee: TengYao Chi >Priority: Minor > Labels: newbie > > This method creates a KafkaConsumer, but does not close it. > We can use a try-with-resources to ensure the consumer is closed prior to > returning or throwing from this function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17207) ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking clients
[ https://issues.apache.org/jira/browse/KAFKA-17207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-17207: Affects Version/s: 3.9.0 > ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking > clients > - > > Key: KAFKA-17207 > URL: https://issues.apache.org/jira/browse/KAFKA-17207 > Project: Kafka > Issue Type: Test > Components: connect >Affects Versions: 3.9.0 >Reporter: Greg Harris >Priority: Minor > Labels: newbie > > The testRequestTimeouts deletes the internal config topic, putting the > connect workers into a bad state. When the test goes to clean up, it calls > DistributedHerder#stop, which waits for the herder executor to stop. This > times out, because the herder executor is blocked closing the > KafkaConfigBackingStore's producer. This log message gets printed: > {noformat} > [2024-07-26 11:52:50,817] ERROR Executor > java.util.concurrent.ThreadPoolExecutor@7ae97a58[Shutting down, pool size = > 1, active threads = 1, queued tasks = 0, completed tasks = 0] did not > terminate in time (org.apache.kafka.common.utils.ThreadUtils:83){noformat} > This effectively leaks the Kafka clients for the workers' internal topics, > and the herder executor thread. Instead, either the producer should not block > indefinitely on a missing topic, or the cluster state should be healed enough > for the producer to shutdown cleanly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17207) ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking clients
Greg Harris created KAFKA-17207: --- Summary: ConnectWorkerIntegrationTest.testRequestTimeouts times out in stop(), leaking clients Key: KAFKA-17207 URL: https://issues.apache.org/jira/browse/KAFKA-17207 Project: Kafka Issue Type: Test Components: connect Reporter: Greg Harris The testRequestTimeouts deletes the internal config topic, putting the connect workers into a bad state. When the test goes to clean up, it calls DistributedHerder#stop, which waits for the herder executor to stop. This times out, because the herder executor is blocked closing the KafkaConfigBackingStore's producer. This log message gets printed: {noformat} [2024-07-26 11:52:50,817] ERROR Executor java.util.concurrent.ThreadPoolExecutor@7ae97a58[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0] did not terminate in time (org.apache.kafka.common.utils.ThreadUtils:83){noformat} This effectively leaks the Kafka clients for the workers' internal topics, and the herder executor thread. Instead, either the producer should not block indefinitely on a missing topic, or the cluster state should be healed enough for the producer to shutdown cleanly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17112) StreamThread shutdown calls completeShutdown only in CREATED state
[ https://issues.apache.org/jira/browse/KAFKA-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-17112: --- Assignee: Ao Li > StreamThread shutdown calls completeShutdown only in CREATED state > -- > > Key: KAFKA-17112 > URL: https://issues.apache.org/jira/browse/KAFKA-17112 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 3.9.0 >Reporter: Ao Li >Assignee: Ao Li >Priority: Minor > > While running tests in `StreamThreadTest.java` in kafka/streams, I noticed > the test left many lingering threads. Though the class runs `shutdown` after > each test, the shutdown only executes `completeShutdown` if the StreamThread > is in CREATED state. See > [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L231] > and > [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1435] > > For example, you may run test > org.apache.kafka.streams.processor.internals.StreamThreadTest#shouldNotCloseTaskProducerWhenSuspending > with commit 0b11971f2c94f7aadc3fab2c51d94642065a72e5. When the test calls > `thread.shutdown()`, the thread is in `PARTITIONS_REVOKED` state. Thus, > `completeShutdown` is not called. The test creates three lingering threads: 2 > `StateUpdater` and 1 `TaskExecutor` > > This means that calls to `thread.shutdown` has no effect in > `StreamThreadTest.java`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-17204) KafkaStreamsCloseOptionsIntegrationTest.before leaks AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-17204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-17204. - Fix Version/s: 3.9.0 Resolution: Fixed > KafkaStreamsCloseOptionsIntegrationTest.before leaks AdminClient > > > Key: KAFKA-17204 > URL: https://issues.apache.org/jira/browse/KAFKA-17204 > Project: Kafka > Issue Type: Test > Components: streams >Affects Versions: 3.9.0 >Reporter: Greg Harris >Assignee: TengYao Chi >Priority: Minor > Labels: newbie > Fix For: 3.9.0 > > > The before method creates an AdminClient, but this client is never closed. It > should be closed either in `after` or `closeCluster`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17206) Use v1 of LeaderChangeMessage when kraft.version is 1
Alyssa Huang created KAFKA-17206: Summary: Use v1 of LeaderChangeMessage when kraft.version is 1 Key: KAFKA-17206 URL: https://issues.apache.org/jira/browse/KAFKA-17206 Project: Kafka Issue Type: Task Reporter: Alyssa Huang [https://github.com/apache/kafka/pull/16668] introduced v1 of LCM but still uses v0 of the schema. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17203) StreamThread leaking producer instances
[ https://issues.apache.org/jira/browse/KAFKA-17203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-17203: --- Assignee: PoAn Yang > StreamThread leaking producer instances > --- > > Key: KAFKA-17203 > URL: https://issues.apache.org/jira/browse/KAFKA-17203 > Project: Kafka > Issue Type: Test > Components: streams >Affects Versions: 3.9.0 >Reporter: Greg Harris >Assignee: PoAn Yang >Priority: Minor > Labels: newbie > > When running > EosIntegrationTest.shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled > leaks streams producers with the KAFKA-15845 leak testing extension, I > observed that this test appears to consistently leak StreamsProducers. The > producer is instantiated here: > {noformat} > This test contains a resource leak. Close the resources, or open a KAFKA > ticket and annotate this class with > @LeakTestingExtension.IgnoreAll("KAFKA-XYZ") > org.opentest4j.AssertionFailedError: This test contains a resource leak. > Close the resources, or open a KAFKA ticket and annotate this class with > @LeakTestingExtension.IgnoreAll("KAFKA-XYZ") > at > org.apache.kafka.common.network.LeakTestingExtension.after(LeakTestingExtension.java:98) > at > org.apache.kafka.common.network.LeakTestingExtension$All.afterAll(LeakTestingExtension.java:123) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) > Caused by: org.opentest4j.AssertionFailedError: Leak check failed > at > org.apache.kafka.common.utils.LeakTester.lambda$combine$0(LeakTester.java:89) > at > org.apache.kafka.common.network.LeakTestingExtension.after(LeakTestingExtension.java:96) > ... 2 more > Suppressed: org.opentest4j.AssertionFailedError: AbstractSelector > instances left open > at > org.apache.kafka.common.utils.PredicateLeakTester.lambda$start$0(PredicateLeakTester.java:94) > at > org.apache.kafka.common.utils.LeakTester.lambda$combine$0(LeakTester.java:86) > ... 3 more > Suppressed: java.lang.Exception: Opened sun.nio.ch.KQueueSelectorImpl > at > org.apache.kafka.common.utils.PredicateLeakTester.open(PredicateLeakTester.java:63) > at > org.apache.kafka.common.network.NetworkContextLeakTester$RecordingSelectorProvider.openSelector(NetworkContextLeakTester.java:135) > at > org.apache.kafka.common.network.TestNetworkContext$SelectorProviderDecorator.openSelector(TestNetworkContext.java:166) > at > org.apache.kafka.common.network.Selector.(Selector.java:160) > at > org.apache.kafka.common.network.Selector.(Selector.java:213) > at > org.apache.kafka.common.network.Selector.(Selector.java:225) > at > org.apache.kafka.common.network.Selector.(Selector.java:229) > at > org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:225) > at > org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:163) > at > org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:526) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:465) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:297) > at > org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getProducer(DefaultKafkaClientSupplier.java:39) > at > org.apache.kafka.streams.processor.internals.StreamsProducer.(StreamsProducer.java:142) > at > org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createRecordCollector(ActiveTaskCreator.java:196) > at > org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createActiveTask(ActiveTaskCreator.java:265) > at > org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createTasks(ActiveTaskCreator.java:176) > at > org.apache.kafka.streams.processor.internals.TaskManager.createNewTasks(TaskManager.java:441) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:390) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1559) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(Consumer
[jira] [Commented] (KAFKA-17203) StreamThread leaking producer instances
[ https://issues.apache.org/jira/browse/KAFKA-17203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868995#comment-17868995 ] Matthias J. Sax commented on KAFKA-17203: - Thanks for your interest! Go for it! > StreamThread leaking producer instances > --- > > Key: KAFKA-17203 > URL: https://issues.apache.org/jira/browse/KAFKA-17203 > Project: Kafka > Issue Type: Test > Components: streams >Affects Versions: 3.9.0 >Reporter: Greg Harris >Assignee: PoAn Yang >Priority: Minor > Labels: newbie > > When running > EosIntegrationTest.shouldCheckpointRestoredOffsetsWhenClosingCleanDuringRestoringStateUpdaterEnabled > leaks streams producers with the KAFKA-15845 leak testing extension, I > observed that this test appears to consistently leak StreamsProducers. The > producer is instantiated here: > {noformat} > This test contains a resource leak. Close the resources, or open a KAFKA > ticket and annotate this class with > @LeakTestingExtension.IgnoreAll("KAFKA-XYZ") > org.opentest4j.AssertionFailedError: This test contains a resource leak. > Close the resources, or open a KAFKA ticket and annotate this class with > @LeakTestingExtension.IgnoreAll("KAFKA-XYZ") > at > org.apache.kafka.common.network.LeakTestingExtension.after(LeakTestingExtension.java:98) > at > org.apache.kafka.common.network.LeakTestingExtension$All.afterAll(LeakTestingExtension.java:123) > at java.base/java.util.ArrayList.forEach(ArrayList.java:1511) > Caused by: org.opentest4j.AssertionFailedError: Leak check failed > at > org.apache.kafka.common.utils.LeakTester.lambda$combine$0(LeakTester.java:89) > at > org.apache.kafka.common.network.LeakTestingExtension.after(LeakTestingExtension.java:96) > ... 2 more > Suppressed: org.opentest4j.AssertionFailedError: AbstractSelector > instances left open > at > org.apache.kafka.common.utils.PredicateLeakTester.lambda$start$0(PredicateLeakTester.java:94) > at > org.apache.kafka.common.utils.LeakTester.lambda$combine$0(LeakTester.java:86) > ... 3 more > Suppressed: java.lang.Exception: Opened sun.nio.ch.KQueueSelectorImpl > at > org.apache.kafka.common.utils.PredicateLeakTester.open(PredicateLeakTester.java:63) > at > org.apache.kafka.common.network.NetworkContextLeakTester$RecordingSelectorProvider.openSelector(NetworkContextLeakTester.java:135) > at > org.apache.kafka.common.network.TestNetworkContext$SelectorProviderDecorator.openSelector(TestNetworkContext.java:166) > at > org.apache.kafka.common.network.Selector.(Selector.java:160) > at > org.apache.kafka.common.network.Selector.(Selector.java:213) > at > org.apache.kafka.common.network.Selector.(Selector.java:225) > at > org.apache.kafka.common.network.Selector.(Selector.java:229) > at > org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:225) > at > org.apache.kafka.clients.ClientUtils.createNetworkClient(ClientUtils.java:163) > at > org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:526) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:465) > at > org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:297) > at > org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getProducer(DefaultKafkaClientSupplier.java:39) > at > org.apache.kafka.streams.processor.internals.StreamsProducer.(StreamsProducer.java:142) > at > org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createRecordCollector(ActiveTaskCreator.java:196) > at > org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createActiveTask(ActiveTaskCreator.java:265) > at > org.apache.kafka.streams.processor.internals.ActiveTaskCreator.createTasks(ActiveTaskCreator.java:176) > at > org.apache.kafka.streams.processor.internals.TaskManager.createNewTasks(TaskManager.java:441) > at > org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:390) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1559) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(Consumer
[jira] [Assigned] (KAFKA-17202) EosIntegrationTest.verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset leaks consumers
[ https://issues.apache.org/jira/browse/KAFKA-17202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-17202: --- Assignee: TengYao Chi > EosIntegrationTest.verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset > leaks consumers > -- > > Key: KAFKA-17202 > URL: https://issues.apache.org/jira/browse/KAFKA-17202 > Project: Kafka > Issue Type: Test > Components: streams >Affects Versions: 3.9.0 >Reporter: Greg Harris >Assignee: TengYao Chi >Priority: Major > Labels: newbie > > This method creates a KafkaConsumer, but does not close it. > We can use a try-with-resources to ensure the consumer is closed prior to > returning or throwing from this function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17202) EosIntegrationTest.verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset leaks consumers
[ https://issues.apache.org/jira/browse/KAFKA-17202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868993#comment-17868993 ] Matthias J. Sax commented on KAFKA-17202: - Thanks for your interest to pick it up. Go for it! > EosIntegrationTest.verifyChangelogMaxRecordOffsetMatchesCheckpointedOffset > leaks consumers > -- > > Key: KAFKA-17202 > URL: https://issues.apache.org/jira/browse/KAFKA-17202 > Project: Kafka > Issue Type: Test > Components: streams >Affects Versions: 3.9.0 >Reporter: Greg Harris >Assignee: TengYao Chi >Priority: Major > Labels: newbie > > This method creates a KafkaConsumer, but does not close it. > We can use a try-with-resources to ensure the consumer is closed prior to > returning or throwing from this function. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17101) Mirror maker internal topics cleanup policy changes to 'delete' from 'compact'
[ https://issues.apache.org/jira/browse/KAFKA-17101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868990#comment-17868990 ] Chris Egerton commented on KAFKA-17101: --- Spoke too soon–looks like the test failures we're seeing are due to a bug in our testing logic, not Kafka (of course ). Still, it'd be useful to know more about your Kafka cluster in case the issue you're experiencing is due to a broker bug. > Mirror maker internal topics cleanup policy changes to 'delete' from > 'compact' > --- > > Key: KAFKA-17101 > URL: https://issues.apache.org/jira/browse/KAFKA-17101 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.4.1, 3.5.1, 3.6.1 >Reporter: kaushik srinivas >Priority: Major > > Scenario/Setup details > Kafka cluster 1: 3 replicas > Kafka cluster 2: 3 replicas > MM1 moving data from cluster 1 to cluster 2 > MM2 moving data from cluster 2 to cluster 1 > Sometimes with a reboot of the kafka cluster 1 and MM1 instance, we observe > MM failing to come up with below exception, > {code:java} > {"message":"DistributedHerder-connect-1-1 - > org.apache.kafka.connect.runtime.distributed.DistributedHerder - [Worker > clientId=connect-1, groupId=site1-mm2] Uncaught exception in herder work > thread, exiting: "}} > org.apache.kafka.common.config.ConfigException: Topic > 'mm2-offsets.site1.internal' supplied via the 'offset.storage.topic' property > is required to have 'cleanup.policy=compact' to guarantee consistency and > durability of source connector offsets, but found the topic currently has > 'cleanup.policy=delete'. Continuing would likely result in eventually losing > source connector offsets and problems restarting this Connect cluster in the > future. Change the 'offset.storage.topic' property in the Connect worker > configurations to use a topic with 'cleanup.policy=compact'. {code} > Once the topic is altered with cleanup policy of compact. MM works just fine. > This is happening on our setups sporadically and across varieties of > scenarios. Not been successful in identifying the exact reproduction steps as > of now. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17186) Cannot receive message after stopping Source Mirror Maker 2
[ https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868989#comment-17868989 ] Greg Harris commented on KAFKA-17186: - I think you want `.scheduled.rebalance.max.delay.ms`, without the "distributed". > Cannot receive message after stopping Source Mirror Maker 2 > --- > > Key: KAFKA-17186 > URL: https://issues.apache.org/jira/browse/KAFKA-17186 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.1 > Environment: Source Kafka Cluster per Node: > CPU(s): 32 > Memory: 32G/1.1G free > Target Kafka Cluster standalone Node: > CPU(s): 24 > Memory: 30G/909M free > Kafka Version 3.7 > Mirrormaker Version 3.7.1 >Reporter: George Yang >Priority: Major > Attachments: image-2024-07-25-14-14-21-327.png, mirrorMaker.out > > > Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node > 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. > Currently, a service on node 1 in Data Center A acts as a producer sending > messages to the `myTest` topic. A service in Data Center B acts as a consumer > listening to `A.myTest`. > The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer > in Data Center B ceases to receive messages. Even after I restarting MM2 in > Data Center A, the consumer in Data Center B still does not receive messages > until approximately 5 minutes later when a rebalance occurs, at which point > it begins receiving messages again. > > [Logs From Consumer on Data Center B] > [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > (Re-)joining group > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) > [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully joined group with generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully synced group in generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:842) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Joined > group at generation 52 with protocol version 2 and got assignment: > Assignment\{error=0, leader='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', > leaderUrl='NOTUSED', offset=1360, connectorIds=[MirrorCheckpointConnector], > taskIds=[MirrorCheckpointConnector-0, MirrorCheckpointConnector-1, > MirrorCheckpointConnector-2], revokedConnectorIds=[], revokedTaskIds=[], > delay=0} with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:2580) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Starting > connectors and tasks using config offset 1360 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1921) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] Finished > starting connectors and tasks > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1950) > [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] > Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) > [2024-07-23 17:29:26,883] INFO [Worker clientId=A->B, groupId=A-mm2] > (Re-)joining group > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) > [2024-07-23 17:29:26,890] INFO [Worker clientId=A->B, groupId=A-mm2] > Successfully joined group with generati
[jira] [Commented] (KAFKA-17112) StreamThread shutdown calls completeShutdown only in CREATED state
[ https://issues.apache.org/jira/browse/KAFKA-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868965#comment-17868965 ] Ao Li commented on KAFKA-17112: --- Hi [~cadonna], I've submitted a patch. Could you please take a look at it when you have time? > StreamThread shutdown calls completeShutdown only in CREATED state > -- > > Key: KAFKA-17112 > URL: https://issues.apache.org/jira/browse/KAFKA-17112 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 3.9.0 >Reporter: Ao Li >Priority: Minor > > While running tests in `StreamThreadTest.java` in kafka/streams, I noticed > the test left many lingering threads. Though the class runs `shutdown` after > each test, the shutdown only executes `completeShutdown` if the StreamThread > is in CREATED state. See > [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L231] > and > [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1435] > > For example, you may run test > org.apache.kafka.streams.processor.internals.StreamThreadTest#shouldNotCloseTaskProducerWhenSuspending > with commit 0b11971f2c94f7aadc3fab2c51d94642065a72e5. When the test calls > `thread.shutdown()`, the thread is in `PARTITIONS_REVOKED` state. Thus, > `completeShutdown` is not called. The test creates three lingering threads: 2 > `StateUpdater` and 1 `TaskExecutor` > > This means that calls to `thread.shutdown` has no effect in > `StreamThreadTest.java`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17033) Consider replacing the directory id Optional with just Uuid
[ https://issues.apache.org/jira/browse/KAFKA-17033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868940#comment-17868940 ] TengYao Chi commented on KAFKA-17033: - Hi [~jsancio] Does this ticket is talking about the field `directoryId` of `MetaProperties` and `ReplicaKey` ? > Consider replacing the directory id Optional with just Uuid > - > > Key: KAFKA-17033 > URL: https://issues.apache.org/jira/browse/KAFKA-17033 > Project: Kafka > Issue Type: Sub-task >Reporter: José Armando García Sancio >Priority: Major > > One way to handle this is to introduce a type called `DirectoryId` that just > encapsulates a Uuid but it is able to better handle the Uuid.ZERO_UUID case. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-17186) Cannot receive message after stopping Source Mirror Maker 2
[ https://issues.apache.org/jira/browse/KAFKA-17186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868842#comment-17868842 ] George Yang edited comment on KAFKA-17186 at 7/26/24 11:21 AM: --- Thanks [~gharris1727] again. Sorry for the confusion, "affect the old Kafka cluster" just means the two Kafka clusters are located in different data centers, please ignore the word ‘old’. To test the rebalance delay, I attempted to set the parameter {{scheduled.rebalance.max.delay.ms=2}} in {{{}connect-mirror-maker.properties{}}}. However, the log file {{mirrormaker.out}} continued to show the old value of {{scheduled.rebalance.max.delay.ms = 30}} under the DistributedConfig section. Subsequently, I tried setting it as follows: A. {{distributed.scheduled.rebalance.max.delay.ms=2}} B. {{distributed.scheduled.rebalance.max.delay.ms=2}} Nevertheless, {{mirrormaker.out}} still indicates the default value of 30. How should I configure this parameter to make it effective? was (Author: JIRAUSER302264): Thanks [~gharris1727] again. Sorry for the confusion, "affect the old Kafka cluster" is just mean the two Kafka clusters locate in each data center, please ignore the word old. To test the rebalance delay, I attempted to set the parameter {{scheduled.rebalance.max.delay.ms=2}} in {{{}connect-mirror-maker.properties{}}}. However, the log file {{mirrormaker.out}} continued to show the old value of {{scheduled.rebalance.max.delay.ms = 30}} under the DistributedConfig section. Subsequently, I tried setting it as follows: A. {{distributed.scheduled.rebalance.max.delay.ms=2}} B. {{distributed.scheduled.rebalance.max.delay.ms=2}} Nevertheless, {{mirrormaker.out}} still indicates the default value of 30. How should I configure this parameter to make it effective? > Cannot receive message after stopping Source Mirror Maker 2 > --- > > Key: KAFKA-17186 > URL: https://issues.apache.org/jira/browse/KAFKA-17186 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.7.1 > Environment: Source Kafka Cluster per Node: > CPU(s): 32 > Memory: 32G/1.1G free > Target Kafka Cluster standalone Node: > CPU(s): 24 > Memory: 30G/909M free > Kafka Version 3.7 > Mirrormaker Version 3.7.1 >Reporter: George Yang >Priority: Major > Attachments: image-2024-07-25-14-14-21-327.png, mirrorMaker.out > > > Deploy nodes 1, 2, and 3 in Data Center A, with MM2 service deployed on node > 1. Deploy node 1 in Data Center B, with MM2 service also deployed on node 1. > Currently, a service on node 1 in Data Center A acts as a producer sending > messages to the `myTest` topic. A service in Data Center B acts as a consumer > listening to `A.myTest`. > The issue arises when MM2 on node 1 in Data Center A is stopped: the consumer > in Data Center B ceases to receive messages. Even after I restarting MM2 in > Data Center A, the consumer in Data Center B still does not receive messages > until approximately 5 minutes later when a rebalance occurs, at which point > it begins receiving messages again. > > [Logs From Consumer on Data Center B] > [2024-07-23 17:29:17,270] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 185 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:19,189] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 365 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:22,271] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 186 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:24,193] INFO [MirrorCheckpointConnector|worker] refreshing > consumer groups took 369 ms (org.apache.kafka.connect.mirror.Scheduler:95) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > Rebalance started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:242) > [2024-07-23 17:29:25,377] INFO [Worker clientId=B->A, groupId=B-mm2] > (Re-)joining group > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:604) > [2024-07-23 17:29:25,386] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully joined group with generation Generation\{generationId=52, > memberId='B->A-adc19038-a8b6-40fb-9bf6-249f866944ab', protocol='sessioned'} > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:665) > [2024-07-23 17:29:25,390] INFO [Worker clientId=B->A, groupId=B-mm2] > Successfully synced group in generation Generation\{generationId=52, >
[jira] [Commented] (KAFKA-17201) SelectorTest.testInboundConnectionsCountInConnectionCreationMetric leaks sockets and threads
[ https://issues.apache.org/jira/browse/KAFKA-17201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868902#comment-17868902 ] TengYao Chi commented on KAFKA-17201: - Hello [~gharris1727] May I have this ticket ? Many thanks :) > SelectorTest.testInboundConnectionsCountInConnectionCreationMetric leaks > sockets and threads > > > Key: KAFKA-17201 > URL: https://issues.apache.org/jira/browse/KAFKA-17201 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.9.0 >Reporter: Greg Harris >Priority: Minor > Labels: flaky-test, newbie > > This test creates multiple Threads which in turn open sockets. It is possible > that these threads and sockets outlive the test itself, as the threads are > not joined before the test completes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17205) Allow topic config validation in controller level in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-17205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-17205: -- Summary: Allow topic config validation in controller level in KRaft mode (was: Allow topic config validation in controller level ) > Allow topic config validation in controller level in KRaft mode > --- > > Key: KAFKA-17205 > URL: https://issues.apache.org/jira/browse/KAFKA-17205 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Allow topic config validation in controller level. This is required because > we need to fail the invalid config change before it is written into metadata > log, especially for tiered storage feature. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17205) Allow topic config validation in controller level in KRaft mode
[ https://issues.apache.org/jira/browse/KAFKA-17205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-17205: -- Description: Allow topic config validation in controller level. This is required because we need to fail the invalid config change before it is written into metadata log, especially for tiered storage feature. Note: This ticket only changes for KRaft mode. was:Allow topic config validation in controller level. This is required because we need to fail the invalid config change before it is written into metadata log, especially for tiered storage feature. > Allow topic config validation in controller level in KRaft mode > --- > > Key: KAFKA-17205 > URL: https://issues.apache.org/jira/browse/KAFKA-17205 > Project: Kafka > Issue Type: Sub-task >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > Allow topic config validation in controller level. This is required because > we need to fail the invalid config change before it is written into metadata > log, especially for tiered storage feature. > > Note: This ticket only changes for KRaft mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-8115) Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated
[ https://issues.apache.org/jira/browse/KAFKA-8115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17868895#comment-17868895 ] Sagar Rao edited comment on KAFKA-8115 at 7/26/24 8:52 AM: --- Failed again in [this|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-16628/5/#showFailuresLink] build. {code:java} java.util.concurrent.TimeoutException: testTaskRequestWithOldStartMsGetsUpdated() timed out after 240 seconds at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) Suppressed: java.lang.InterruptedException at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1765) at java.base/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475) at java.base/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:780) at org.apache.kafka.trogdor.coordinator.TaskManager.waitForShutdown(TaskManager.java:686) at org.apache.kafka.trogdor.coordinator.Coordinator.waitForShutdown(Coordinator.java:131) at org.apache.kafka.trogdor.common.MiniTrogdorCluster.close(MiniTrogdorCluster.java:288) at org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated(CoordinatorTest.java:594) at java.base/java.lang.reflect.Method.invoke(Method.java:580) ... 2 more {code} was (Author: sagarrao): Failed again in [this|https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-16628/5/#showFailuresLink] build. ``` java.util.concurrent.TimeoutException: testTaskRequestWithOldStartMsGetsUpdated() timed out after 240 seconds at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) at java.base/java.util.ArrayList.forEach(ArrayList.java:1596) Suppressed: java.lang.InterruptedException at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1765) at java.base/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1475) at java.base/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:780) at org.apache.kafka.trogdor.coordinator.TaskManager.waitForShutdown(TaskManager.java:686) at org.apache.kafka.trogdor.coordinator.Coordinator.waitForShutdown(Coordinator.java:131) at org.apache.kafka.trogdor.common.MiniTrogdorCluster.close(MiniTrogdorCluster.java:288) at org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated(CoordinatorTest.java:594) at java.base/java.lang.reflect.Method.invoke(Method.java:580) ... 2 more ``` > Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated > --- > > Key: KAFKA-8115 > URL: https://issues.apache.org/jira/browse/KAFKA-8115 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Greg Harris >Priority: Critical > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3254/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/testTaskRequestWithOldStartMsGetsUpdated/] > {quote}org.junit.runners.model.TestTimedOutException: test timed out after > 12 milliseconds at java.base@11.0.1/jdk.internal.misc.Unsafe.park(Native > Method) at > java.base@11.0.1/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234) > at > java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123) > at > java.base@11.0.1/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454) > at > java.base@11.0.1/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709) > at > app//org.apache.kafka.trogdor.rest.JsonRestServer.waitForShutdown(JsonRestServer.java:157) > at app//org.apache.kafka.trogdor.agent.Agent.waitForShutdown(Agent.java:123) > at > app//org.apache.kafka.trogdor.common.MiniTrogdorCluster.close(MiniTrogdorCluster.java:285) > at > app//org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated(CoordinatorTest.java:596) > at > java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.1/ja