[jira] [Commented] (KAFKA-16223) Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest
[ https://issues.apache.org/jira/browse/KAFKA-16223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817597#comment-17817597 ] Chaitanya Mukka commented on KAFKA-16223: - Hi [~hgeraldino]! I hope you are doing well. Are you actively working on this task? I have a local branch with changes that I was working on the same. I wanted to check if you have already closed on the changes for this or if you are open to me picking it up. Thanks! > Replace EasyMock and PowerMock with Mockito for KafkaConfigBackingStoreTest > --- > > Key: KAFKA-16223 > URL: https://issues.apache.org/jira/browse/KAFKA-16223 > Project: Kafka > Issue Type: Sub-task > Components: connect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Fix KafkaAdminClientTest.testClientInstanceId [kafka]
dajac merged PR #15370: URL: https://github.com/apache/kafka/pull/15370 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16259) Immutable MetadataCache to improve client performance
[ https://issues.apache.org/jira/browse/KAFKA-16259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Satish Duggana reassigned KAFKA-16259: -- Assignee: Zhifeng Chen > Immutable MetadataCache to improve client performance > - > > Key: KAFKA-16259 > URL: https://issues.apache.org/jira/browse/KAFKA-16259 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.8.0 >Reporter: Zhifeng Chen >Assignee: Zhifeng Chen >Priority: Major > Attachments: image-2024-02-14-12-11-07-366.png > > > TL;DR, A Kafka client produce latency issue is identified caused by > synchronized lock contention of metadata cache read/write in the native kafka > producer. > Trigger Condition: A producer need to produce to large number of topics. such > as in kafka rest-proxy > > > What is producer metadata cache > Kafka producer maintains a in-memory copy of cluster metadata, and it avoided > fetch metadata every time when produce message to reduce latency > > What’s the synchronized lock contention problem > Kafka producer metadata cache is a *mutable* object, read/write are isolated > by a synchronized lock. Which means when the metadata cache is being updated, > all read requests are blocked. > Topic metadata expiration frequency increase liner with number of topics. In > a kafka cluster with large number of topic partitions, topic metadata > expiration and refresh triggers high frequent metadata update. When read > operation blocked by update, producer threads are blocked and caused high > produce latency issue. > > *Proposed solution* > TL;DR Optimize performance of metadata cache read operation of native kafka > producer with copy-on-write strategy > What is copy-on-write strategy > It’s a solution to reduce synchronized lock contention by making the object > immutable, and always create a new instance when updating, but since the > object is immutable, read operation will be free from waiting, thus produce > latency reduced significantly > Besides performance, it can also make the metadata cache immutable from > unexpected modification, reduce occurrence of code bugs due to incorrect > synchronization > > {*}Test result{*}: > Environment: Kafka-rest-proxy > Client version: 2.8.0 > Number of topic partitions: 250k > test result show 90%+ latency reduction on test cluster > !image-2024-02-14-12-11-07-366.png! > P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper > part show latency after the improvement, lower part show before improvement) > *Dump show details of the problem* > Threads acquiring lock > Kafka-rest-proxy-jetty-thread-pool-199waiting to acquire [ > 0x7f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-200waiting to acquire [ > 0x7f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-202waiting to acquire [ > 0x7f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-203waiting to acquire [ > 0x7f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-204waiting to acquire [ > 0x7f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-205waiting to acquire [ > 0x7f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-207waiting to acquire [ > 0x7f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-212waiting to acquire [ > 0x7f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-214waiting to acquire [ > 0x7f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-215waiting to acquire [ > 0x7f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-217waiting to acquire [ > 0x7f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-218waiting to acquire [ > 0x7f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-219waiting to acquire [ > 0x7f77d70121a0 ] > Kafka-rest-proxy-jetty-thread-pool-222waiting to acquire [ > 0x7f77d70121a0 ] > ... > at org.apache.kafka.clients.Metadata.fetch(Metadata.java:111) > at > org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1019) > at > org.apache.kafka.clients.producer.KafkaProducer.partitionsFor(KafkaProducer.java:1144) > at > io.confluent.kafkarest.producer.internal.MetadataImpl.maybeUpdate(MetadataImpl.java:39) > at > io.confluent.kafkarest.producer.ResilientProducer.send(ResilientProducer.java:117) > Threads hold the lock > kafka-producer-network-thread | kafka-rest-proxyrunning , holding [ > 0x7f77d70121a0 ] > at > java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195) > at > java.util.ArrayList$ArrayListSpliterator.forEachRemaining(java.base@11.0.18/ArrayList.java:1655) > at > java.util.stream.AbstractPipeline.copyInto(java.base@11.0.18/AbstractPipeline.java:484) > at > org.apache.kafka.common.requests.MetadataResponse.convertToNodeArray(MetadataResponse.jav
Re: [PR] improve TopicCommandIntegrationTest to be less flaky [kafka]
showuon commented on PR #14891: URL: https://github.com/apache/kafka/pull/14891#issuecomment-1945384353 @jolshan , do you want to have another look? If no, I'm going to merge this PR this week. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] PROPOSAL: support async event execution in group coordinator [kafka]
github-actions[bot] commented on PR #14705: URL: https://github.com/apache/kafka/pull/14705#issuecomment-1945310660 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Kafka Streams tests from Zookeeper to KRaft [kafka]
mdedetrich commented on PR #15341: URL: https://github.com/apache/kafka/pull/15341#issuecomment-194527 > Thanks for taking the comments into consideration. One other suggestion is that we can have a look into #13375 to see what is the common between both `EmbeddedKafkaCluster` in stream and connect and see if we can have one common `EmbeddedKafkaCluster` between both of them. And later move connect to use it. Which also can be follow up JIRA if we want to keep this one simple. Either way I'm happy to lend a hand in the connect part if needed! Cool I will look into it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15625: Do not flush global state store at each commit [kafka]
jeffkbkim commented on code in PR #15361: URL: https://github.com/apache/kafka/pull/15361#discussion_r1490286153 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateConsumerTest.java: ## @@ -161,6 +140,10 @@ public void update(final ConsumerRecord record) { updatedPartitions.put(tp, updatedPartitions.get(tp) + 1); } +@Override +public void maybeCheckpoint() { +flushState(); +} } } Review Comment: nit: can we add a newline? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java: ## @@ -150,5 +150,11 @@ private void initTopology() { } } +@Override +public void maybeCheckpoint() { +if (StateManagerUtil.checkpointNeeded(false, stateMgr.changelogOffsets(), offsets)) { Review Comment: [AbstractTask#maybeCheckpoint()](https://github.com/apache/kafka/blob/a1f3c6d16061566a4f53c72a95e2679b8ee229e0/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java#L97) passes in `stateMgr.changelogOffsets` as the new offset snapshot in `StateManagerUtil#checkpointNeeded`. Why is the behavior reversed here? is it because `offsets` is the object that is updated on every `update()`? what about `ProcessorStateManager#changelogOffsets`, would that not have the latest? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java: ## @@ -259,19 +251,14 @@ void initialize() { for (final Map.Entry entry : partitionOffsets.entrySet()) { globalConsumer.seek(entry.getKey(), entry.getValue()); } -lastFlush = time.milliseconds(); } void pollAndUpdate() { final ConsumerRecords received = globalConsumer.poll(pollTime); for (final ConsumerRecord record : received) { stateMaintainer.update(record); } -final long now = time.milliseconds(); -if (now - flushInterval >= lastFlush) { -stateMaintainer.flushState(); -lastFlush = now; -} +stateMaintainer.maybeCheckpoint(); Review Comment: should we still keep an interval-based flush or is the delta check sufficient? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16260) Remove window.size.ms from StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Component/s: streams > Remove window.size.ms from StreamsConfig > > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Priority: Major > > {{window.size.ms}} is not a true KafkaStreams config, and results in an > error when set from a KStreams application. It belongs on the client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16260) Remove window.size.ms from StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-16260: Labels: needs-kip (was: ) > Remove window.size.ms from StreamsConfig > > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Priority: Major > Labels: needs-kip > > {{window.size.ms}} is not a true KafkaStreams config, and results in an > error when set from a KStreams application. It belongs on the client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16260) Remove window.size.ms from StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-16260: --- Assignee: Lucia Cerchie > Remove window.size.ms from StreamsConfig > > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Lucia Cerchie >Assignee: Lucia Cerchie >Priority: Major > Labels: needs-kip > > {{window.size.ms}} is not a true KafkaStreams config, and results in an > error when set from a KStreams application. It belongs on the client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15665: Enforce partition reassignment should complete when all target replicas are in ISR [kafka]
jolshan commented on PR #15359: URL: https://github.com/apache/kafka/pull/15359#issuecomment-1945185518 I will wait a day or to see if @cmccabe has any comments. If not I will merge. 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
ijuma commented on PR #15323: URL: https://github.com/apache/kafka/pull/15323#issuecomment-1945066461 @hachikuji @msn-tldr #15376 looks related. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16259 Immutable MetadataCache to improve client performance [kafka]
ijuma commented on PR #15376: URL: https://github.com/apache/kafka/pull/15376#issuecomment-1945065055 Thanks for the PR. Is this similar to what was done in #15323? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15770: IQv2 must return immutable position [kafka]
mjsax commented on PR #15219: URL: https://github.com/apache/kafka/pull/15219#issuecomment-1945045218 > Do we need to lock the implementations of AbstractDualSchemaRocksDBSegmentedBytesStore.getWriteBatches ? Outstanding catch! I believe yes. Updated the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16259) Immutable MetadataCache to improve client performance
[ https://issues.apache.org/jira/browse/KAFKA-16259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhifeng Chen updated KAFKA-16259: - Description: TL;DR, A Kafka client produce latency issue is identified caused by synchronized lock contention of metadata cache read/write in the native kafka producer. Trigger Condition: A producer need to produce to large number of topics. such as in kafka rest-proxy What is producer metadata cache Kafka producer maintains a in-memory copy of cluster metadata, and it avoided fetch metadata every time when produce message to reduce latency What’s the synchronized lock contention problem Kafka producer metadata cache is a *mutable* object, read/write are isolated by a synchronized lock. Which means when the metadata cache is being updated, all read requests are blocked. Topic metadata expiration frequency increase liner with number of topics. In a kafka cluster with large number of topic partitions, topic metadata expiration and refresh triggers high frequent metadata update. When read operation blocked by update, producer threads are blocked and caused high produce latency issue. *Proposed solution* TL;DR Optimize performance of metadata cache read operation of native kafka producer with copy-on-write strategy What is copy-on-write strategy It’s a solution to reduce synchronized lock contention by making the object immutable, and always create a new instance when updating, but since the object is immutable, read operation will be free from waiting, thus produce latency reduced significantly Besides performance, it can also make the metadata cache immutable from unexpected modification, reduce occurrence of code bugs due to incorrect synchronization {*}Test result{*}: Environment: Kafka-rest-proxy Client version: 2.8.0 Number of topic partitions: 250k test result show 90%+ latency reduction on test cluster !image-2024-02-14-12-11-07-366.png! P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper part show latency after the improvement, lower part show before improvement) *Dump show details of the problem* Threads acquiring lock Kafka-rest-proxy-jetty-thread-pool-199waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-200waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-202waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-203waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-204waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-205waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-207waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-212waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-214waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-215waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-217waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-218waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-219waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-222waiting to acquire [ 0x7f77d70121a0 ] ... at org.apache.kafka.clients.Metadata.fetch(Metadata.java:111) at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1019) at org.apache.kafka.clients.producer.KafkaProducer.partitionsFor(KafkaProducer.java:1144) at io.confluent.kafkarest.producer.internal.MetadataImpl.maybeUpdate(MetadataImpl.java:39) at io.confluent.kafkarest.producer.ResilientProducer.send(ResilientProducer.java:117) Threads hold the lock kafka-producer-network-thread | kafka-rest-proxyrunning , holding [ 0x7f77d70121a0 ] at java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(java.base@11.0.18/ArrayList.java:1655) at java.util.stream.AbstractPipeline.copyInto(java.base@11.0.18/AbstractPipeline.java:484) at org.apache.kafka.common.requests.MetadataResponse.convertToNodeArray(MetadataResponse.java:162) at org.apache.kafka.common.requests.MetadataResponse.toPartitionInfo(MetadataResponse.java:152) at org.apache.kafka.clients.MetadataCache.lambda$computeClusterView$1(MetadataCache.java:177) at org.apache.kafka.clients.MetadataCache$$Lambda$695/0x7f75da3ddcb0.apply(Unknown Source) at java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195) at org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:178) at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) was: TL;DR, A Kafka client produce latency issue is identified caused by synchronized lock contention of metadata cache read/write in the native kafka producer.
[jira] [Updated] (KAFKA-16259) Immutable MetadataCache to improve client performance
[ https://issues.apache.org/jira/browse/KAFKA-16259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhifeng Chen updated KAFKA-16259: - Description: TL;DR, A Kafka client produce latency issue is identified caused by synchronized lock contention of metadata cache read/write in the native kafka producer. Trigger Condition: A producer need to produce to large number of topics. such as in kafka rest-proxy What is producer metadata cache Kafka producer maintains a in-memory copy of cluster metadata, and it avoided fetch metadata every time when produce message to reduce latency What’s the synchronized lock contention problem Kafka producer metadata cache is a *mutable* object, read/write are isolated by a synchronized lock. Which means when the metadata cache is being updated, all read requests are blocked. Topic metadata expiration frequency increase liner with number of topics. In a kafka cluster with large number of topic partitions, topic metadata expiration and refresh triggers high frequent metadata update. When read operation blocked by update, producer threads are blocked and caused high produce latency issue. *Proposed solution* TL;DR Optimize performance of metadata cache read operation of native kafka producer with copy-on-write strategy What is copy-on-write strategy It’s a solution to reduce synchronized lock contention by making the object immutable, and always create a new instance when updating, but since the object is immutable, read operation will be free from waiting, thus produce latency reduced significantly Besides performance, it can also make the metadata cache immutable from unexpected modification, reduce occurrence of code bugs due to incorrect synchronization {*}Test result{*}: Environment: Kafka-rest-proxy Client version: 2.8.0 Number of topic partitions: 250k test result show 90%+ latency reduction on test cluster !image-2024-02-14-12-11-07-366.png! P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper part show latency after the improvement, lower part show before improvement) Thread dump show details of the synchronization contention Threads acquiring lock Kafka-rest-proxy-jetty-thread-pool-199waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-200waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-202waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-203waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-204waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-205waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-207waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-212waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-214waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-215waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-217waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-218waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-219waiting to acquire [ 0x7f77d70121a0 ] Kafka-rest-proxy-jetty-thread-pool-222waiting to acquire [ 0x7f77d70121a0 ] ... at org.apache.kafka.clients.Metadata.fetch(Metadata.java:111) at org.apache.kafka.clients.producer.KafkaProducer.waitOnMetadata(KafkaProducer.java:1019) at org.apache.kafka.clients.producer.KafkaProducer.partitionsFor(KafkaProducer.java:1144) at io.confluent.kafkarest.producer.internal.MetadataImpl.maybeUpdate(MetadataImpl.java:39) at io.confluent.kafkarest.producer.ResilientProducer.send(ResilientProducer.java:117) Threads hodl the lock kafka-producer-network-thread | kafka-rest-proxyrunning , holding [ 0x7f77d70121a0 ] at java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(java.base@11.0.18/ArrayList.java:1655) at java.util.stream.AbstractPipeline.copyInto(java.base@11.0.18/AbstractPipeline.java:484) at org.apache.kafka.common.requests.MetadataResponse.convertToNodeArray(MetadataResponse.java:162) at org.apache.kafka.common.requests.MetadataResponse.toPartitionInfo(MetadataResponse.java:152) at org.apache.kafka.clients.MetadataCache.lambda$computeClusterView$1(MetadataCache.java:177) at org.apache.kafka.clients.MetadataCache$$Lambda$695/0x7f75da3ddcb0.apply(Unknown Source) at java.util.stream.ReferencePipeline$3$1.accept(java.base@11.0.18/ReferencePipeline.java:195) at org.apache.kafka.clients.MetadataCache.computeClusterView(MetadataCache.java:178) at java.lang.Thread.run(java.base@11.0.18/Thread.java:829) was: TL;DR, A Kafka client produce latency issue is identified caused by synchronized lock contention of metadata cache read/write in the
[PR] KAFKA-16259 Immutable MetadataCache to improve client performance [kafka]
ericzhifengchen opened a new pull request, #15376: URL: https://github.com/apache/kafka/pull/15376 *More detailed description of your change, current MetadataCache is partially (but not fully) immutable, thus read/write requires synchronization and led to high produce latency with large number of topics in a kafka cluster This change improves performance of metadata cache read operation of native kafka producer with copy-on-write strategy What is copy-on-write strategy It’s a solution to reduce synchronized lock contention by making the object immutable, and always create a new instance when updating, but since the object is immutable, read operation will be free from waiting, thus produce latency reduced significantly Besides performance, it can also make the metadata cache immutable from unexpected modification, reduce occurrence of code bugs due to incorrect synchronization *Summary of testing strategy (including rationale) Add unit test to cover concurrent read/write Run in production, with large scale for over 3months ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]
AndrewJSchofield commented on code in PR #15375: URL: https://github.com/apache/kafka/pull/15375#discussion_r1490086839 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -731,10 +729,29 @@ private boolean targetAssignmentReconciled() { return currentAssignment.equals(currentTargetAssignment); } +/** + * @return True if the member should not send heartbeats, which would be one of the following + * cases: + * + * Member is not subscribed to any topics + * Member has received a fatal error in a previous heartbeat response + * Member is stale, meaning that it has left the group due to expired poll timer + * + */ @Override public boolean shouldSkipHeartbeat() { MemberState state = state(); -return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL; +return state == MemberState.UNSUBSCRIBED || state == MemberState.FATAL || state == MemberState.STALE; +} + +/** + * @return True if the member is preparing to leave the group (waiting for callbacks), or + * leaving (sending last heartbeat). This is used to skip proactively leaving the group when + * the consumer poll timer expires. + */ +public boolean isLeavingGroup() { +MemberState state = state(); +return state == MemberState.PREPARE_LEAVING || state == MemberState.LEAVING; } /** Review Comment: I think that technically, it doesn't take the *user* to poll in order to rejoin. The code polls more frequently is the user's poll timeout is long enough. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -188,18 +188,18 @@ public HeartbeatRequestManager( @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { if (!coordinatorRequestManager.coordinator().isPresent() || -membershipManager.shouldSkipHeartbeat() || -pollTimer.isExpired()) { +membershipManager.shouldSkipHeartbeat()) { membershipManager.onHeartbeatRequestSkipped(); return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); -if (pollTimer.isExpired()) { -logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + -"was longer than the configured max.poll.interval.ms, which typically implies that " + -"the poll loop is spending too much time processing messages. You can address this " + -"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + -"returned in poll() with max.poll.records."); +if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { +logger.warn("Consumer poll timeout has expired. This means the time between " + +"subsequent calls to poll() was longer than the configured max.poll.interval.ms, " + +"which typically implies that the poll loop is spending too much time processing " + +"messages. You can address this either by increasing max.poll.interval.ms or by " + +"reducing the maximum size of batches returned in poll() with max.poll.records."); + Review Comment: I think the logic as written is correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16260) Remove window.size.ms from StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucia Cerchie updated KAFKA-16260: -- Description: {{window.size.ms}} is not a true KafkaStreams config, and results in an error when set from a KStreams application. It belongs on the client. > Remove window.size.ms from StreamsConfig > > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement >Reporter: Lucia Cerchie >Priority: Major > > {{window.size.ms}} is not a true KafkaStreams config, and results in an > error when set from a KStreams application. It belongs on the client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16260) Remove window.size.ms from StreamsConfig
Lucia Cerchie created KAFKA-16260: - Summary: Remove window.size.ms from StreamsConfig Key: KAFKA-16260 URL: https://issues.apache.org/jira/browse/KAFKA-16260 Project: Kafka Issue Type: Improvement Reporter: Lucia Cerchie -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16260) Remove window.size.ms from StreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817530#comment-17817530 ] Lucia Cerchie commented on KAFKA-16260: --- see discussion https://github.com/apache/kafka/pull/14360 > Remove window.size.ms from StreamsConfig > > > Key: KAFKA-16260 > URL: https://issues.apache.org/jira/browse/KAFKA-16260 > Project: Kafka > Issue Type: Improvement >Reporter: Lucia Cerchie >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15333) Flaky build failure throwing Connect Exception: Could not connect to server....
[ https://issues.apache.org/jira/browse/KAFKA-15333?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817529#comment-17817529 ] Greg Harris commented on KAFKA-15333: - Caused by this Gradle bug: https://github.com/gradle/gradle/issues/27801 > Flaky build failure throwing Connect Exception: Could not connect to > server > --- > > Key: KAFKA-15333 > URL: https://issues.apache.org/jira/browse/KAFKA-15333 > Project: Kafka > Issue Type: Test > Components: connect, unit tests >Reporter: Philip Nee >Priority: Major > > We frequently observe flaky build failure with the following message. The is > from the most recent PR post 3.5.0: > > {code:java} > > Task :generator:testClasses UP-TO-DATE > Unexpected exception thrown. > org.gradle.internal.remote.internal.MessageIOException: Could not read > message from '/127.0.0.1:38354'. > at > org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:94) > at > org.gradle.internal.remote.internal.hub.MessageHub$ConnectionReceive.run(MessageHub.java:270) > at > org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:64) > at > org.gradle.internal.concurrent.AbstractManagedExecutor$1.run(AbstractManagedExecutor.java:47) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > Caused by: java.lang.IllegalArgumentException > at > org.gradle.internal.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:72) > at > org.gradle.internal.remote.internal.hub.InterHubMessageSerializer$MessageReader.read(InterHubMessageSerializer.java:52) > at > org.gradle.internal.remote.internal.inet.SocketConnection.receive(SocketConnection.java:81) > ... 6 more > > Task :streams:upgrade-system-tests-26:unitTest > org.gradle.internal.remote.internal.ConnectException: Could not connect to > server [3156f144-9a89-4c47-91ad-88a8378ec726 port:37889, > addresses:[/127.0.0.1]]. Tried addresses: [/127.0.0.1]. > at > org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:67) > at > org.gradle.internal.remote.internal.hub.MessageHubBackedClient.getConnection(MessageHubBackedClient.java:36) > at > org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:103) > at > org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65) > at > worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69) > at > worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74) > Caused by: java.net.ConnectException: Connection refused > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716) > at sun.nio.ch.SocketAdaptor.connect(SocketAdaptor.java:122) > at > org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.tryConnect(TcpOutgoingConnector.java:81) > at > org.gradle.internal.remote.internal.inet.TcpOutgoingConnector.connect(TcpOutgoingConnector.java:54) > ... 5 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15407) Not able to connect to kafka from the Private NLB from outside the VPC account
[ https://issues.apache.org/jira/browse/KAFKA-15407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-15407. - Resolution: Invalid > Not able to connect to kafka from the Private NLB from outside the VPC > account > --- > > Key: KAFKA-15407 > URL: https://issues.apache.org/jira/browse/KAFKA-15407 > Project: Kafka > Issue Type: Bug > Components: clients, connect, consumer, producer , protocol > Environment: Staging, PROD >Reporter: Shivakumar >Priority: Blocker > Attachments: image-2023-08-28-12-37-33-100.png > > > !image-2023-08-28-12-37-33-100.png|width=768,height=223! > Problem statement : > We are trying to connect Kafka from another account/VPC account > Our kafka is in EKS cluster , we have service pointing to these pods for > connection > We tried to create private link endpoint form Account B to connect to our NLB > to connect to our Kafka in Account A > We see the connection reset from both client and target(kafka) in the NLB > monitoring tab of AWS. > We tried various combo of listeners and advertised listeners which did not > help us. > We are assuming we are missing some combination of Listeners and Network > level configs with which this connection can be made > Can you please guide us with this as we are blocked with a major migration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15407) Not able to connect to kafka from the Private NLB from outside the VPC account
[ https://issues.apache.org/jira/browse/KAFKA-15407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817524#comment-17817524 ] Greg Harris commented on KAFKA-15407: - Hi [~shivakumar] I don't think this is an appropriate place to request this form of support. You can reference the public documentation: [https://kafka.apache.org/documentation/#brokerconfigs] and the networking documentation for your cloud provider. > Not able to connect to kafka from the Private NLB from outside the VPC > account > --- > > Key: KAFKA-15407 > URL: https://issues.apache.org/jira/browse/KAFKA-15407 > Project: Kafka > Issue Type: Bug > Components: clients, connect, consumer, producer , protocol > Environment: Staging, PROD >Reporter: Shivakumar >Priority: Blocker > Attachments: image-2023-08-28-12-37-33-100.png > > > !image-2023-08-28-12-37-33-100.png|width=768,height=223! > Problem statement : > We are trying to connect Kafka from another account/VPC account > Our kafka is in EKS cluster , we have service pointing to these pods for > connection > We tried to create private link endpoint form Account B to connect to our NLB > to connect to our Kafka in Account A > We see the connection reset from both client and target(kafka) in the NLB > monitoring tab of AWS. > We tried various combo of listeners and advertised listeners which did not > help us. > We are assuming we are missing some combination of Listeners and Network > level configs with which this connection can be made > Can you please guide us with this as we are blocked with a major migration. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]
kirktrue commented on code in PR #15375: URL: https://github.com/apache/kafka/pull/15375#discussion_r1490029659 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManager.java: ## @@ -188,18 +188,18 @@ public HeartbeatRequestManager( @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { if (!coordinatorRequestManager.coordinator().isPresent() || -membershipManager.shouldSkipHeartbeat() || -pollTimer.isExpired()) { +membershipManager.shouldSkipHeartbeat()) { membershipManager.onHeartbeatRequestSkipped(); return NetworkClientDelegate.PollResult.EMPTY; } pollTimer.update(currentTimeMs); -if (pollTimer.isExpired()) { -logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() " + -"was longer than the configured max.poll.interval.ms, which typically implies that " + -"the poll loop is spending too much time processing messages. You can address this " + -"either by increasing max.poll.interval.ms or by reducing the maximum size of batches " + -"returned in poll() with max.poll.records."); +if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { +logger.warn("Consumer poll timeout has expired. This means the time between " + +"subsequent calls to poll() was longer than the configured max.poll.interval.ms, " + +"which typically implies that the poll loop is spending too much time processing " + +"messages. You can address this either by increasing max.poll.interval.ms or by " + +"reducing the maximum size of batches returned in poll() with max.poll.records."); + Review Comment: Should the `!membershipManager.isLeavingGroup()` if clause be a nested `if` _inside_ the `if (pollTimer.isExpired()`? That is, do we really want to continue down to line 212 if the timer is expired but it's not already leaving the group? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -335,8 +335,13 @@ public int memberEpoch() { return memberEpoch; } +/** + * @return True if there hasn't been a call to consumer.poll() withing the max.poll.interval. + * In that case, it is expected that the member will leave the group and rejoin on the next + * call to consumer.poll(). + */ @Override -public boolean isStaled() { +public boolean isStale() { Review Comment: Thank you! 😄 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -335,8 +335,13 @@ public int memberEpoch() { return memberEpoch; } +/** + * @return True if there hasn't been a call to consumer.poll() withing the max.poll.interval. + * In that case, it is expected that the member will leave the group and rejoin on the next + * call to consumer.poll(). + */ @Override -public boolean isStaled() { +public boolean isStale() { Review Comment: Any reason we don't just expose the inner state via a `state()` method so that we don't have to write `isStateA`, `isStateB`, `isStateC`, etc.? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java: ## @@ -685,13 +690,6 @@ public boolean shouldHeartbeatNow() { @Override public void onHeartbeatRequestSent() { MemberState state = state(); -if (isStaled()) { -log.debug("Member {} is staled and is therefore leaving the group. It will rejoin upon the next poll.", memberEpoch); -// TODO: Integrate partition revocation/loss callback -transitionToJoining(); -return; -} - Review Comment: For the uninitiated (like myself), would you consider adding a really brief comment that provides pointers to where some of the other states (e.g. `STALE`) are handled? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManager.java: ## @@ -64,9 +64,11 @@ public interface MembershipManager extends RequestManager { MemberState state(); /** - * @return True if the member is staled due to expired poll timer. + * @return True if the poll timer expired, indicating that there hasn't been a call to + * consumer poll within the max poll interval. In this case, the member will proactively + * leave the group, and rejoin on the next call to poll. */ -boolean isStaled(); +boolean isStale(); Review Comment: Thank you! 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific commen
[jira] [Updated] (KAFKA-16165) Consumer invalid transition on expired poll interval
[ https://issues.apache.org/jira/browse/KAFKA-16165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16165: --- Description: Running system tests with the new async consumer revealed an invalid transition related to the consumer not being polled on the interval in some kind of scenario (maybe relates to consumer close, as the transition is leaving->stale) Log trace: [2024-01-17 19:45:07,379] WARN [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) [2024-01-17 19:45:07,379] ERROR [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] Unexpected error caught in consumer network thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91) java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303) at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739) at org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88) We should review the poll expiration logic that triggers a leave group operation. That is currently applied in the HB Manager poll, without any validation, and given it depends on the consumer poll timer, it could happen at any time, regardless of the state of the member. Ex. poll timer could expire when the member is leaving, leading to this leaving->stale invalid transition. We should probably consider that this pro-active leave should only apply when the consumer is not leaving (prepare leaving or leaving) was: Running system tests with the new async consumer revealed an invalid transition related to the consumer not being polled on the interval in some kind of scenario (maybe relates to consumer close, as the transition is leaving->stale) Log trace: [2024-01-17 19:45:07,379] WARN [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) [2024-01-17 19:45:07,379] ERROR [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] Unexpected error caught in consumer network thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91) java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303) at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739) at org.apache.kafka.clients.consumer.inte
[jira] [Commented] (KAFKA-15841) Add Support for Topic-Level Partitioning in Kafka Connect
[ https://issues.apache.org/jira/browse/KAFKA-15841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817522#comment-17817522 ] Greg Harris commented on KAFKA-15841: - Hi [~henriquemota]! Can you share more details about your setup? Do you have a single JDBC connector, or multiple in the same cluster? Are connectors configured to read from one topic, or multiple? What is your tasks.max configuration, and what consumer assignor are you using? Consumers support distributing reading from multiple topics/topic-partitions across consumers in a consumer group, and Connect supports distributing multiple tasks across multiple workers, both of which allow you to map multiple topic-partitions of work onto multiple workers. For example, say you have N topic-paritions, and M connect workers, with N > M. Any one of the following could be used to distribute the work: 1. You could run one-connector-per-topic, and configure each connector with tasks.max=1, and have the worker distribute the N tasks across M workers. 2. You could add all N topics to a single connector with tasks.max=M, and have the consumer group distribute the N topic-partitions among those M tasks. 3. You could manually group the N topics into M groups, and create M connectors with tasks.max=1, giving each connector one group of topics. > Add Support for Topic-Level Partitioning in Kafka Connect > - > > Key: KAFKA-15841 > URL: https://issues.apache.org/jira/browse/KAFKA-15841 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Henrique Mota >Priority: Trivial > > In our organization, we utilize JDBC sink connectors to consume data from > various topics, where each topic is dedicated to a specific tenant with a > single partition. Recently, we developed a custom sink based on the standard > JDBC sink, enabling us to pause consumption of a topic when encountering > problematic records. > However, we face limitations within Kafka Connect, as it doesn't allow for > appropriate partitioning of topics among workers. We attempted a workaround > by breaking down the topics list within the 'topics' parameter. > Unfortunately, Kafka Connect overrides this parameter after invoking the > {{taskConfigs(int maxTasks)}} method from the > {{org.apache.kafka.connect.connector.Connector}} class. > We request the addition of support in Kafka Connect to enable the > partitioning of topics among workers without requiring a fork. This > enhancement would facilitate better load distribution and allow for more > flexible configurations, particularly in scenarios where topics are dedicated > to different tenants. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-16165: Fix invalid transition on poll timer expiration [kafka]
lianetm opened a new pull request, #15375: URL: https://github.com/apache/kafka/pull/15375 This fixes an invalid transition (leaving->stale) that was discovered in the system tests. The underlying issue was that the poll timer expiration logic was blindly forcing a transition to stale and sending a leave group, without considering that the member could be already leaving. The fix included in this PR ensures that the poll timer expiration logic, whose purpose is to leave the group, is only applied if the member is not already leaving. Note that it also fixes the transition out of the STALE state, that should only happen when the poll timer is reset. As a result of this changes: - If the poll timer expires while the member is not leaving, the poll timer expiration logic is applied: it will transition to stale, send a leave group, and remain in STALE state until the timer is reset. At that point the member will transition to JOINING to rejoin the group. - If the poll timer expires while the member is already leaving, the poll timer expiration logic does not apply, and just lets the HB continue. Not that this would be the case of member in PREPARE_LEAVING waiting for callbacks to complete (needs to continue sending HB), or LEAVING (needs to send the last HB to leave). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16259) Immutable MetadataCache to improve client performance
[ https://issues.apache.org/jira/browse/KAFKA-16259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhifeng Chen updated KAFKA-16259: - Description: TL;DR, A Kafka client produce latency issue is identified caused by synchronized lock contention of metadata cache read/write in the native kafka producer. Trigger Condition: A producer need to produce to large number of topics. such as in kafka rest-proxy What is producer metadata cache Kafka producer maintains a in-memory copy of cluster metadata, and it avoided fetch metadata every time when produce message to reduce latency What’s the synchronized lock contention problem Kafka producer metadata cache is a *mutable* object, read/write are isolated by a synchronized lock. Which means when the metadata cache is being updated, all read requests are blocked. Topic metadata expiration frequency increase liner with number of topics. In a kafka cluster with large number of topic partitions, topic metadata expiration and refresh triggers high frequent metadata update. When read operation blocked by update, producer threads are blocked and caused high produce latency issue. *Proposed solution* TL;DR Optimize performance of metadata cache read operation of native kafka producer with copy-on-write strategy What is copy-on-write strategy It’s a solution to reduce synchronized lock contention by making the object immutable, and always create a new instance when updating, but since the object is immutable, read operation will be free from waiting, thus produce latency reduced significantly Besides performance, it can also make the metadata cache immutable from unexpected modification, reduce occurrence of code bugs due to incorrect synchronization {*}Test result{*}: Environment: Kafka-rest-proxy Client version: 2.8.0 Number of topic partitions: 250k test result show 90%+ latency reduction on test cluster !image-2024-02-14-12-11-07-366.png! P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper part show latency after the improvement, lower part show before improvement) was: TL;DR, A Kafka client produce latency issue is identified caused by synchronized lock contention of metadata cache read/write in the native kafka producer. Trigger Condition: A producer need to produce to large number of topics. such as in kafka rest-proxy What is producer metadata cache Kafka producer maintains a in-memory copy of cluster metadata, and it avoided fetch metadata every time when produce message to reduce latency What’s the synchronized lock contention problem Kafka producer metadata cache is a *mutable* object, read/write are isolated by a synchronized lock. Which means when the metadata cache is being updated, all read requests are blocked. Topic metadata expiration frequency increase liner with number of topics. In a kafka cluster with large number of topic partitions, topic metadata expiration and refresh triggers high frequent metadata update. When read operation blocked by update, producer threads are blocked and caused high produce latency issue. *Proposed solution* TL;DR Optimize performance of metadata cache read operation of native kafka producer with copy-on-write strategy What is copy-on-write strategy It’s a solution to reduce synchronized lock contention by making the object immutable, and always create a new instance when updating, but since the object is immutable, read operation will be free from waiting, thus produce latency reduced significantly Besides performance, it can also make the metadata cache immutable from unexpected modification, reduce occurrence of code bugs due to incorrect synchronization Test result: Environment: Kafka-rest-proxy Client version: 2.8.0 Number of topic partitions: 250k test result show 90%+ latency reduction on test cluster !image-2024-02-14-12-11-07-366.png! P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper part show latency after the improvement, lower part show before improvement) > Immutable MetadataCache to improve client performance > - > > Key: KAFKA-16259 > URL: https://issues.apache.org/jira/browse/KAFKA-16259 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.8.0 >Reporter: Zhifeng Chen >Priority: Major > Attachments: image-2024-02-14-12-11-07-366.png > > > TL;DR, A Kafka client produce latency issue is identified caused by > synchronized lock contention of metadata cache read/write in the native kafka > producer. > Trigger Condition: A producer need to produce to large number of topics. such > as in kafka rest-proxy > > > What is producer metadata cache > Kafka producer maintains a in-memory copy of cluster metadata, and it
[jira] [Updated] (KAFKA-16145) Windows Kafka Shutdown
[ https://issues.apache.org/jira/browse/KAFKA-16145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-16145: Component/s: core (was: connect) > Windows Kafka Shutdown > -- > > Key: KAFKA-16145 > URL: https://issues.apache.org/jira/browse/KAFKA-16145 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.6.0 > Environment: windows, openjdk-21, kafka_2.12-3.6.0 >Reporter: user017 >Priority: Major > Fix For: 3.6.0 > > > ERROR Error while deleting segments for test.public.testtable-0 in dir > C:\tmp\kafka-logs > (org.apache.kafka.storage.internals.log.LogDirFailureChannel) > java.nio.file.FileSystemException: > C:\tmp\kafka-logs\test.public.testtable-0\02043576.timeindex -> > C:\tmp\kafka-logs\test.public.testtable-0\02043576.timeindex.deleted: > The process cannot access the file because it is being used by another > process > at > java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) > at > java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) > at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:414) > at > java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:291) > at java.base/java.nio.file.Files.move(Files.java:1430) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:982) > at > org.apache.kafka.storage.internals.log.AbstractIndex.renameTo(AbstractIndex.java:227) > at > org.apache.kafka.storage.internals.log.LazyIndex$IndexValue.renameTo(LazyIndex.java:122) > at > org.apache.kafka.storage.internals.log.LazyIndex.renameTo(LazyIndex.java:202) > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:495) > at > kafka.log.LocalLog$.$anonfun$deleteSegmentFiles$1(LocalLog.scala:917) > at > kafka.log.LocalLog$.$anonfun$deleteSegmentFiles$1$adapted(LocalLog.scala:915) > at scala.collection.immutable.List.foreach(List.scala:333) > at kafka.log.LocalLog$.deleteSegmentFiles(LocalLog.scala:915) > at kafka.log.LocalLog.removeAndDeleteSegments(LocalLog.scala:317) > at > kafka.log.UnifiedLog.$anonfun$deleteSegments$2(UnifiedLog.scala:1469) > at kafka.log.UnifiedLog.deleteSegments(UnifiedLog.scala:1845) > at > kafka.log.UnifiedLog.deleteRetentionMsBreachedSegments(UnifiedLog.scala:1443) > at kafka.log.UnifiedLog.deleteOldSegments(UnifiedLog.scala:1487) > at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:1282) > at > kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:1279) > at scala.collection.immutable.List.foreach(List.scala:333) > at kafka.log.LogManager.cleanupLogs(LogManager.scala:1279) > at > kafka.log.LogManager.$anonfun$startupWithConfigOverrides$2(LogManager.scala:562) > at > org.apache.kafka.server.util.KafkaScheduler.lambda$schedule$1(KafkaScheduler.java:150) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) > at > java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:358) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) > at java.base/java.lang.Thread.run(Thread.java:1583) > Suppressed: java.nio.file.FileSystemException: > C:\tmp\kafka-logs\test.public.testtable-0\02043576.timeindex -> > C:\tmp\kafka-logs\test.public.testtable-0\02043576.timeindex.deleted: > The process cannot access the file because it is being used by another > process > at > java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) > at > java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) > at > java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:328) > at > java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:291) > at java.base/java.nio.file.Files.move(Files.java:1430) > at > org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:978) > ... 25 more > > > ERROR Shutdown broker because all log dirs in C:\tmp\kafka-logs have failed > (kafka.log.LogManager) > > {color:#172b4d}*Run in administrator mode, no processes running*{c
[jira] [Created] (KAFKA-16259) Immutable MetadataCache to improve client performance
Zhifeng Chen created KAFKA-16259: Summary: Immutable MetadataCache to improve client performance Key: KAFKA-16259 URL: https://issues.apache.org/jira/browse/KAFKA-16259 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 2.8.0 Reporter: Zhifeng Chen Attachments: image-2024-02-14-12-11-07-366.png TL;DR, A Kafka client produce latency issue is identified caused by synchronized lock contention of metadata cache read/write in the native kafka producer. Trigger Condition: A producer need to produce to large number of topics. such as in kafka rest-proxy What is producer metadata cache Kafka producer maintains a in-memory copy of cluster metadata, and it avoided fetch metadata every time when produce message to reduce latency What’s the synchronized lock contention problem Kafka producer metadata cache is a *mutable* object, read/write are isolated by a synchronized lock. Which means when the metadata cache is being updated, all read requests are blocked. Topic metadata expiration frequency increase liner with number of topics. In a kafka cluster with large number of topic partitions, topic metadata expiration and refresh triggers high frequent metadata update. When read operation blocked by update, producer threads are blocked and caused high produce latency issue. *Proposed solution* TL;DR Optimize performance of metadata cache read operation of native kafka producer with copy-on-write strategy What is copy-on-write strategy It’s a solution to reduce synchronized lock contention by making the object immutable, and always create a new instance when updating, but since the object is immutable, read operation will be free from waiting, thus produce latency reduced significantly Besides performance, it can also make the metadata cache immutable from unexpected modification, reduce occurrence of code bugs due to incorrect synchronization Test result: Environment: Kafka-rest-proxy Client version: 2.8.0 Number of topic partitions: 250k test result show 90%+ latency reduction on test cluster !image-2024-02-14-12-11-07-366.png! P99 produce latency on deployed instances reduced from 200ms -> 5ms (upper part show latency after the improvement, lower part show before improvement) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-9790) Wrong metric bean name for connect worker metrics
[ https://issues.apache.org/jira/browse/KAFKA-9790?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-9790: --- Component/s: connect > Wrong metric bean name for connect worker metrics > - > > Key: KAFKA-9790 > URL: https://issues.apache.org/jira/browse/KAFKA-9790 > Project: Kafka > Issue Type: Bug > Components: connect > Environment: Tested with this docker image: > cnfldemos/cp-server-connect-datagen:0.2.0-5.4.0 >Reporter: Alexandre YANG >Priority: Major > Attachments: image-2020-03-31-15-11-51-031.png, > image-2020-03-31-15-16-37-573.png > > > The connect worker metrics implemented here: > * [https://github.com/apache/kafka/pull/6843/files] > * > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-475%3A+New+Metrics+to+Measure+Number+of+Tasks+on+a+Connector] > should have this bean name: > > {code:java} > kafka.connect:type=connect-worker-metrics,connector="{connector}" > {code} > but it's currently (see screenshot) > > {code:java} > kafka.connect:type=connector-metrics,connector=datagen2 > {code} > > !image-2020-03-31-15-11-51-031.png|width=673,height=371! > !image-2020-03-31-15-16-37-573.png|width=688,height=458! -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-10735) Kafka producer producing corrupted avro values when confluent cluster is recreated and producer application is not restarted
[ https://issues.apache.org/jira/browse/KAFKA-10735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-10735. - Resolution: Cannot Reproduce Hi, since this doesn't contain any Apache Kafka code, and doesn't provide any configurations, we are unable to reproduce this failure. Closing as cannot reproduce. > Kafka producer producing corrupted avro values when confluent cluster is > recreated and producer application is not restarted > > > Key: KAFKA-10735 > URL: https://issues.apache.org/jira/browse/KAFKA-10735 > Project: Kafka > Issue Type: Bug >Reporter: Tim Tattersall >Priority: Major > > *Our Environment (AWS):* > 1 x EC2 instance running 4 docker containers (using docker-compose) > * cp-kafka 5.5.1 > * cp-zookeeper 5.5.1 > * cp-schema-registry 5.5.1 > * cp-enterprise-control-center 5.5.1 > 1 x ECS service running a single java application with spring-kafka producer > Topics are using String key and Avro value > > *Problem:* > * Avro values published after confluent cluster is recreated are corrupted. > Expecting Avro json structure, received string value with corrupted Avro > details > ** Expected: {"metadata":{"nabEventVersion":"1.0","type":"Kafka IBMMQ sink > connector","schemaUrl": ...*ongoing* > ** Actual: 1.08Kafka IBMMQ source > connector^kafka-conector-ibm-mq-source-entitlements-check\Kafka IBMMQ source > connector - sourced*ongoing* > > *How to Reproduce* > # Using an existing confluent cluster > # Start a kafka producer java application (ours running with spring-kafka) > # Destroy the existing confluent cluster (using docker-compose down) > # Recreate the confluent cluster (using docker-compose up) > # Add the topic back onto the new cluster > # Trigger a message to be produced by the running Kafka producer > > *Current Workaround* > * Killing running tasks on ECS service and allowing AWS to start new ones -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12309) The revocation algorithm produces uneven distributions
[ https://issues.apache.org/jira/browse/KAFKA-12309?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-12309: Component/s: connect > The revocation algorithm produces uneven distributions > -- > > Key: KAFKA-12309 > URL: https://issues.apache.org/jira/browse/KAFKA-12309 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > Assignments: > "W0" -> 8 connectors/tasks > "W1" -> 8 connectors/tasks > (New) "W2" -> 0 connectors/tasks > Revoked (trunk) > "W0" -> 2 connectors/tasks > "W1" -> 2 connectors/tasks > Revoked (expected) > "W0" -> 2 connectors/tasks > "W1" -> 3 connectors/tasks -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-10719) MirrorMaker2 fails to update its runtime configuration
[ https://issues.apache.org/jira/browse/KAFKA-10719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-10719: Component/s: mirrormaker > MirrorMaker2 fails to update its runtime configuration > -- > > Key: KAFKA-10719 > URL: https://issues.apache.org/jira/browse/KAFKA-10719 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.6.0 >Reporter: Peter Sinoros-Szabo >Priority: Major > Fix For: 3.7.0, 3.6.2 > > > I was running successfully the MM2 cluster with the following configuration, > I simplified it a little: {code:java} clusters = main, backup > main.bootstrap.servers = kafkaA:9202,kafkaB:9092,kafkaB:9092 > backup.bootstrap.servers = backupA:9092,backupB:9092,backupC:9092 > main->backup.enabled = true main->backup.topics = .*{code} I wanted to change > the bootstrap.address list of the destination cluster to a different list > that is pointing to the *same* cluster, just a different listener with a > different routing. So I changed it to: {code:java} backup.bootstrap.servers = > backupSecA:1234,backupSecB:1234,backupSecC:1234{code} I did a rolling restart > on the MM2 nodes and say that some tasks were still using the old bootstrap > addresses, some of them were using the new one. I don't have the logs, so > unfortunately I don't know which one picked up the good values and which > didn't. I even stopped the cluster completely, but it didn't help. Ryanne > adviced to delete the mm2-config and mm2-status topics, so I did delete those > on the destination cluster, that solved this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-10719) MirrorMaker2 fails to update its runtime configuration
[ https://issues.apache.org/jira/browse/KAFKA-10719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris resolved KAFKA-10719. - Fix Version/s: 3.7.0 3.6.2 Resolution: Fixed > MirrorMaker2 fails to update its runtime configuration > -- > > Key: KAFKA-10719 > URL: https://issues.apache.org/jira/browse/KAFKA-10719 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.0 >Reporter: Peter Sinoros-Szabo >Priority: Major > Fix For: 3.7.0, 3.6.2 > > > I was running successfully the MM2 cluster with the following configuration, > I simplified it a little: {code:java} clusters = main, backup > main.bootstrap.servers = kafkaA:9202,kafkaB:9092,kafkaB:9092 > backup.bootstrap.servers = backupA:9092,backupB:9092,backupC:9092 > main->backup.enabled = true main->backup.topics = .*{code} I wanted to change > the bootstrap.address list of the destination cluster to a different list > that is pointing to the *same* cluster, just a different listener with a > different routing. So I changed it to: {code:java} backup.bootstrap.servers = > backupSecA:1234,backupSecB:1234,backupSecC:1234{code} I did a rolling restart > on the MM2 nodes and say that some tasks were still using the old bootstrap > addresses, some of them were using the new one. I don't have the logs, so > unfortunately I don't know which one picked up the good values and which > didn't. I even stopped the cluster completely, but it didn't help. Ryanne > adviced to delete the mm2-config and mm2-status topics, so I did delete those > on the destination cluster, that solved this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-10266) Fix connector configs in docs to mention the correct default value inherited from worker configs
[ https://issues.apache.org/jira/browse/KAFKA-10266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-10266: Component/s: connect > Fix connector configs in docs to mention the correct default value inherited > from worker configs > > > Key: KAFKA-10266 > URL: https://issues.apache.org/jira/browse/KAFKA-10266 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Konstantine Karantasis >Assignee: Luke Chen >Priority: Major > > > Example: > [https://kafka.apache.org/documentation/#header.converter] > has the correct default when it is mentioned as a worker property. > But under the section of source connector configs, it's default value is said > to be `null`. > Though that is correct in terms of implementation, it's confusing for users. > We should surface the correct defaults for configs that inherit (or otherwise > override) worker configs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14773) Make MirrorMaker startup synchronous
[ https://issues.apache.org/jira/browse/KAFKA-14773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14773: Component/s: mirrormaker > Make MirrorMaker startup synchronous > > > Key: KAFKA-14773 > URL: https://issues.apache.org/jira/browse/KAFKA-14773 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Divij Vaidya >Priority: Major > Labels: mirror-maker > > Currently, MirrorMaker is started using ` > ./bin/connect-mirror-maker.sh mm2.properties` shell command. However, even if > the shell command has exited and a log with `Kafka MirrorMaker started` has > been printed, it is likely that the underlying connectors and tasks have not > been configured. > This tasks aims to make the MirrorMaker startup synchronous by either waiting > for connections & tasks to move to running state before exiting the > `MirrorMaker#start()` function or by blocking completion of `main()`. > A conversation about this was done in > [https://github.com/apache/kafka/pull/13284] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13656) Connect Transforms support for nested structures
[ https://issues.apache.org/jira/browse/KAFKA-13656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-13656: Component/s: connect > Connect Transforms support for nested structures > > > Key: KAFKA-13656 > URL: https://issues.apache.org/jira/browse/KAFKA-13656 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > Labels: connect-transformation, needs-kip > > Single Message Transforms (SMT), > [KIP-66|https://cwiki.apache.org/confluence/display/KAFKA/KIP-66%3A+Single+Message+Transforms+for+Kafka+Connect], > have greatly improved Connector's usability by enabling processing > input/output data without the need for additional streaming applications. > These benefits have been limited as most SMT implementations are limited to > fields available on the root structure: > * https://issues.apache.org/jira/browse/KAFKA-7624 > * https://issues.apache.org/jira/browse/KAFKA-10640 > Therefore, this KIP is aimed to include support for nested structures on the > existing SMTs — where this make sense —, and to include the abstractions to > reuse this in future SMTs. > > KIP: https://cwiki.apache.org/confluence/x/BafkCw -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14652) Improve MM2 logging by adding the flow information to the context (KIP-916)
[ https://issues.apache.org/jira/browse/KAFKA-14652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14652: Component/s: mirrormaker > Improve MM2 logging by adding the flow information to the context (KIP-916) > --- > > Key: KAFKA-14652 > URL: https://issues.apache.org/jira/browse/KAFKA-14652 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > MirrorMaker2 runs multiple Connect worker instances in a single process. In > Connect, the logging is based on the assumption that Connector names are > unique. But in MM2, the same Connector names are being used in each flow > (Connect group). This means that there is no way to differentiate between the > logs of MirrorSourceConnector in A->B and in B->A. > This can be improved by adding the flow to the logging context and the names > of the Connect framework threads. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15892) Flaky test: testAlterSinkConnectorOffsets – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15892?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15892: Component/s: connect > Flaky test: testAlterSinkConnectorOffsets – > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest > -- > > Key: KAFKA-15892 > URL: https://issues.apache.org/jira/browse/KAFKA-15892 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Apoorv Mittal >Priority: Major > Labels: flaky-test > > h4. Error > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > alter connector offsets. Error response: \{"error_code":500,"message":"Failed > to alter consumer group offsets for connector test-connector either because > its tasks haven't stopped completely yet or the connector was resumed before > the request to alter its offsets could be successfully completed. If the > connector is in a stopped state, this operation can be safely retried. If it > doesn't eventually succeed, the Connect cluster may need to be restarted to > get rid of the zombie sink tasks."} > h4. Stacktrace > org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not > alter connector offsets. Error response: \{"error_code":500,"message":"Failed > to alter consumer group offsets for connector test-connector either because > its tasks haven't stopped completely yet or the connector was resumed before > the request to alter its offsets could be successfully completed. If the > connector is in a stopped state, this operation can be safely retried. If it > doesn't eventually succeed, the Connect cluster may need to be restarted to > get rid of the zombie sink tasks."} > at > app//org.apache.kafka.connect.util.clusters.EmbeddedConnect.alterConnectorOffsets(EmbeddedConnect.java:614) > at > app//org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.alterConnectorOffsets(EmbeddedConnectCluster.java:48) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.alterAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:363) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsets(OffsetsApiIntegrationTest.java:287) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@11.0.16.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base@11.0.16.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@11.0.16.1/java.lang.reflect.Method.invoke(Method.java:566) > at > app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15914) Flaky test - testAlterSinkConnectorOffsetsOverriddenConsumerGroupId - OffsetsApiIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15914: Component/s: connect > Flaky test - testAlterSinkConnectorOffsetsOverriddenConsumerGroupId - > OffsetsApiIntegrationTest > --- > > Key: KAFKA-15914 > URL: https://issues.apache.org/jira/browse/KAFKA-15914 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Lucas Brutschy >Priority: Major > Labels: flaky-test > > Test intermittently gives the following result: > {code} > org.opentest4j.AssertionFailedError: Condition not met within timeout 3. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) > at > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:917) > at > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.alterAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:396) > at > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testAlterSinkConnectorOffsetsOverriddenConsumerGroupId(OffsetsApiIntegrationTest.java:297) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14823) Clean up ConfigProvider API
[ https://issues.apache.org/jira/browse/KAFKA-14823?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14823: Component/s: connect > Clean up ConfigProvider API > --- > > Key: KAFKA-14823 > URL: https://issues.apache.org/jira/browse/KAFKA-14823 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Mickael Maison >Priority: Major > > The ConfigProvider interface exposes several methods that are not used: > - ConfigData get(String path) > - default void subscribe(String path, Set keys, ConfigChangeCallback > callback) > - default void unsubscribe(String path, Set keys, > ConfigChangeCallback callback) > - default void unsubscribeAll() > We should either build mechanisms to support them or deprecate them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15891) Flaky test: testResetSinkConnectorOffsetsOverriddenConsumerGroupId – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-15891?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15891: Component/s: connect > Flaky test: testResetSinkConnectorOffsetsOverriddenConsumerGroupId – > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest > --- > > Key: KAFKA-15891 > URL: https://issues.apache.org/jira/browse/KAFKA-15891 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Apoorv Mittal >Priority: Major > Labels: flaky-test > > h4. Error > org.opentest4j.AssertionFailedError: Condition not met within timeout 3. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > h4. Stacktrace > org.opentest4j.AssertionFailedError: Condition not met within timeout 3. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:917) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.resetAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:725) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15918) Flaky test - OffsetsApiIntegrationTest.testResetSinkConnectorOffsets
[ https://issues.apache.org/jira/browse/KAFKA-15918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-15918: Component/s: connect > Flaky test - OffsetsApiIntegrationTest.testResetSinkConnectorOffsets > > > Key: KAFKA-15918 > URL: https://issues.apache.org/jira/browse/KAFKA-15918 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Haruki Okada >Priority: Major > Labels: flaky-test > Attachments: stdout.log > > > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14242/14/tests/] > > {code:java} > Error > org.opentest4j.AssertionFailedError: Condition not met within timeout 3. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > Stacktrace > org.opentest4j.AssertionFailedError: Condition not met within timeout 3. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > at > org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:210) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:331) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:379) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:328) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:312) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:302) > at > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:917) > at > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.resetAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:725) > at > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testResetSinkConnectorOffsets(OffsetsApiIntegrationTest.java:672) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:112) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Me
[jira] [Updated] (KAFKA-13756) Connect validate endpoint should return proper response for invalid connector class
[ https://issues.apache.org/jira/browse/KAFKA-13756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-13756: Component/s: connect > Connect validate endpoint should return proper response for invalid connector > class > --- > > Key: KAFKA-13756 > URL: https://issues.apache.org/jira/browse/KAFKA-13756 > Project: Kafka > Issue Type: Improvement > Components: connect >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > Currently, if there is an issue with the connector class, the validate > endpoint returns a 400 or a 500 response. > Instead, it should return a well formatted response containing a proper > validation error message. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-8314) Managing the doc field in case of schema projection - kafka connect
[ https://issues.apache.org/jira/browse/KAFKA-8314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-8314: --- Component/s: connect > Managing the doc field in case of schema projection - kafka connect > --- > > Key: KAFKA-8314 > URL: https://issues.apache.org/jira/browse/KAFKA-8314 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: kaushik srinivas >Priority: Major > > Doc field change in the schema while writing to hdfs using hdfs sink > connector via connect framework would cause failures in schema projection. > > java.lang.RuntimeException: > org.apache.kafka.connect.errors.SchemaProjectorException: Schema parameters > not equal. source parameters: \{connect.record.doc=xxx} and target > parameters: \{connect.record.doc=yyy} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group
[ https://issues.apache.org/jira/browse/KAFKA-16258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16258: --- Labels: kip-848-client-support (was: ) > Stale member should trigger onPartitionsLost when leaving group > --- > > Key: KAFKA-16258 > URL: https://issues.apache.org/jira/browse/KAFKA-16258 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > When the poll timer expires, the new consumer proactively leaves the group > and clears its assignments, but it should also invoke the onPartitionsLost > callback. The legacy coordinator does the following sequence on poll timer > expiration: send leave group request > ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]), > invoke onPartitionsLost, and when it completes it clears the assignment > (onJoinPrepare > [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]). > This issue is most probably what is causing the failures in the integration > tests that fail expecting callbacks when the poll interval expires (like > https://issues.apache.org/jira/browse/KAFKA-16008) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs
[ https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817500#comment-17817500 ] Lianet Magrans edited comment on KAFKA-16008 at 2/14/24 7:25 PM: - [~kirktrue] I just added the links to the issue that most probably is causing/related to this failure was (Author: JIRAUSER300183): [~kirktrue] I just added the links to the issue that most probably is causing this failure > Fix PlaintextConsumerTest.testMaxPollIntervalMs > --- > > Key: KAFKA-16008 > URL: https://issues.apache.org/jira/browse/KAFKA-16008 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194) > {code} > The logs include this line: > > {code} > [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16008) Fix PlaintextConsumerTest.testMaxPollIntervalMs
[ https://issues.apache.org/jira/browse/KAFKA-16008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817500#comment-17817500 ] Lianet Magrans commented on KAFKA-16008: [~kirktrue] I just added the links to the issue that most probably is causing this failure > Fix PlaintextConsumerTest.testMaxPollIntervalMs > --- > > Key: KAFKA-16008 > URL: https://issues.apache.org/jira/browse/KAFKA-16008 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: consumer-threading-refactor, integration-tests, timeout > Fix For: 3.8.0 > > > The integration test {{PlaintextConsumerTest.testMaxPollIntervalMs}} is > failing when using the {{AsyncKafkaConsumer}}. > The error is: > {code} > org.opentest4j.AssertionFailedError: Timed out before expected rebalance > completed > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:134) > at > kafka.api.AbstractConsumerTest.awaitRebalance(AbstractConsumerTest.scala:317) > at > kafka.api.PlaintextConsumerTest.testMaxPollIntervalMs(PlaintextConsumerTest.scala:194) > {code} > The logs include this line: > > {code} > [2023-12-13 15:11:16,134] WARN [Consumer clientId=ConsumerTestConsumer, > groupId=my-test] consumer poll timeout has expired. This means the time > between subsequent calls to poll() was longer than the configured > max.poll.interval.ms, which typically implies that the poll loop is spending > too much time processing messages. You can address this either by increasing > max.poll.interval.ms or by reducing the maximum size of batches returned in > poll() with max.poll.records. > (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) > {code} > I don't know if that's related or not. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group
[ https://issues.apache.org/jira/browse/KAFKA-16258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16258: --- Description: When the poll timer expires, the new consumer proactively leaves the group and clears its assignments, but it should also invoke the onPartitionsLost callback. The legacy coordinator does the following sequence on poll timer expiration: send leave group request ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]), invoke onPartitionsLost, and when it completes it clears the assignment (onJoinPrepare [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]). This issue is most probably what is causing the failures in the integration tests that fail expecting callbacks when the poll interval expires (like https://issues.apache.org/jira/browse/KAFKA-16008) was:When the poll timer expires, the new consumer proactively leaves the group and clears its assignments, but it should also invoke the onPartitionsLost callback. The legacy coordinator does the following sequence on poll timer expiration: send leave group request ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]), invoke onPartitionsLost, and when it completes it clears the assignment (onJoinPrepare [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]). > Stale member should trigger onPartitionsLost when leaving group > --- > > Key: KAFKA-16258 > URL: https://issues.apache.org/jira/browse/KAFKA-16258 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lianet Magrans >Priority: Major > Fix For: 3.8.0 > > > When the poll timer expires, the new consumer proactively leaves the group > and clears its assignments, but it should also invoke the onPartitionsLost > callback. The legacy coordinator does the following sequence on poll timer > expiration: send leave group request > ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]), > invoke onPartitionsLost, and when it completes it clears the assignment > (onJoinPrepare > [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]). > This issue is most probably what is causing the failures in the integration > tests that fail expecting callbacks when the poll interval expires (like > https://issues.apache.org/jira/browse/KAFKA-16008) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group
Lianet Magrans created KAFKA-16258: -- Summary: Stale member should trigger onPartitionsLost when leaving group Key: KAFKA-16258 URL: https://issues.apache.org/jira/browse/KAFKA-16258 Project: Kafka Issue Type: Sub-task Components: clients, consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans Fix For: 3.8.0 When the poll timer expires, the new consumer proactively leaves the group and clears its assignments, but it should also invoke the onPartitionsLost callback. The legacy coordinator does the following sequence on poll timer expiration: send leave group request ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]), invoke onPartitionsLost, and when it completes it clears the assignment (onJoinPrepare [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16206 Fix unnecessary topic config deletion during ZK migration [kafka]
ahuang98 commented on PR #14206: URL: https://github.com/apache/kafka/pull/14206#issuecomment-193746 @mimaison @mumrah I moved unrelated test changes over to https://github.com/apache/kafka/pull/15373. The latest commit [1c1eb7d](https://github.com/apache/kafka/pull/15373/commits/1c1eb7d1a866c56188a97f6fa41940933e456c03) addresses the comments left here (mostly imports) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [DRAFT] [KAFKA-16069] Source Tasks re-transform records after Retriable exceptions [kafka]
wasniksudesh opened a new pull request, #15374: URL: https://github.com/apache/kafka/pull/15374 If producer.send() throws a retriableException, exactly one single record would've been transformed (but not delivered). Simply storing SourceRecord and ProducerRecord (obtained after transformations and conversion) of this iteration should be enough. In next "sendRecords()", this stored information is used to potentially skip re-transformation and re-conversion of this record. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Image test improvements [kafka]
ahuang98 opened a new pull request, #15373: URL: https://github.com/apache/kafka/pull/15373 More commit history and comments from https://github.com/apache/kafka/pull/14206 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec
[ https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817494#comment-17817494 ] Greg Harris edited comment on KAFKA-13505 at 2/14/24 7:01 PM: -- Hi [~twbecker] Thanks for bringing this issue to our attention. There's a couple of reasons this hasn't been addressed: * The ticket has been inactive, and other more active tickets are getting attention from contributors. * There appears to be a (third-party) workaround as described above by [~jcustenborder], and that repository appears to still be maintained and conduct releases. * The SchemaProjector is a utility class which is included in the connect API, but is not used by the framework anywhere, so only some users of some plugins experience this fault. * Enums are not a first-class object in the Connect Schema, and are instead implemented by plugins using so-called "Logical Types", which are a first-class type (e.g. STRING), combined with metadata that the plugins understand. The last one is the most relevant: Connect is doing an "enum-unaware" comparison in the SchemaProjector, because to Connect, the enum is just a strange looking string. Rather than do something incorrect, the SchemaProjector declares that it does not know how to handle the type, and errors out. What is needed is an "enum-aware" schema projector, which can only be implemented by a project that knows what the "enum" is. For Apache Kafka to solve this problem as-described, it would involve adding a special case for this one type, which is not a good solution. An alternative would be to add a first-class ENUM type, but that would present a migration challenge for the existing enum infrastructure. Another alternative is to make SchemaProjector extensible to projecting logical types. This would allow the implementors of the ENUM type (or any higher type) to inform the SchemaProjector class of how it should perform projection. I've opened a ticket here: KAFKA-16257 was (Author: gharris1727): Hi [~twbecker] Thanks for bringing this issue to our attention. There's a couple of reasons this hasn't been addressed: * The ticket has been inactive, and other more active tickets are getting attention from contributors. * There appears to be a (third-party) workaround as described above by [~jcustenborder], and that repository appears to still be maintained and conduct releases. * The SchemaProjector is a utility class which is included in the connect API, but is not used by the framework anywhere, so only some users of some plugins experience this fault. * Enums are not a first-class object in the Connect Schema, and are instead implemented by plugins using so-called "Logical Types", which are a first-class type (e.g. STRING), combined with metadata that the plugins understand. The last one is the most relevant: Connect is doing an "enum-unaware" comparison in the SchemaProjector, because to Connect, the enum is just a strange looking string. Rather than do something incorrect, the SchemaProjector declares that it does not know how to handle the type, and errors out. What is needed is an "enum-aware" schema projector, which can only be implemented by a project that knows what the "enum" is. For Apache Kafka to solve this problem as-described, it would involve adding a special case for this one type, which is not a good solution. An alternative would be to add a first-class ENUM type, but that would present a migration challenge for the existing enum infrastructure. Another alternative is to deprecate and remove this implementation of SchemaProjector, and replace it with a SchemaProjector which is extensible to projecting logical types. This would allow the implementors of the ENUM type (or any higher type) to inform the SchemaProjector class of how it should perform projection. I've opened a ticket here: KAFKA-16257 > Kafka Connect should respect Avro 1.10.X enum defaults spec > --- > > Key: KAFKA-13505 > URL: https://issues.apache.org/jira/browse/KAFKA-13505 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Guus De Graeve >Priority: Major > > We are using Kafka Connect to pipe data from Kafka topics into parquet files > on S3. Our Kafka data is serialised using Avro (schema registry). We use the > Amazon S3 Sink Connector for this. > Up until recently we would set "schema.compatibility" to "NONE" in our > connectors, but this had the pain-full side-effect that during deploys of our > application we got huge file explosions (lots of very small files in HDFS / > S3). This happens because kafka connect will create a new file every time the > schema id of a log changes compared to the previous log. During deploys of > our applications (w
[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec
[ https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817494#comment-17817494 ] Greg Harris commented on KAFKA-13505: - Hi [~twbecker] Thanks for bringing this issue to our attention. There's a couple of reasons this hasn't been addressed: * The ticket has been inactive, and other more active tickets are getting attention from contributors. * There appears to be a (third-party) workaround as described above by [~jcustenborder], and that repository appears to still be maintained and conduct releases. * The SchemaProjector is a utility class which is included in the connect API, but is not used by the framework anywhere, so only some users of some plugins experience this fault. * Enums are not a first-class object in the Connect Schema, and are instead implemented by plugins using so-called "Logical Types", which are a first-class type (e.g. STRING), combined with metadata that the plugins understand. The last one is the most relevant: Connect is doing an "enum-unaware" comparison in the SchemaProjector, because to Connect, the enum is just a strange looking string. Rather than do something incorrect, the SchemaProjector declares that it does not know how to handle the type, and errors out. What is needed is an "enum-aware" schema projector, which can only be implemented by a project that knows what the "enum" is. For Apache Kafka to solve this problem as-described, it would involve adding a special case for this one type, which is not a good solution. An alternative would be to add a first-class ENUM type, but that would present a migration challenge for the existing enum infrastructure. Another alternative is to deprecate and remove this implementation of SchemaProjector, and replace it with a SchemaProjector which is extensible to projecting logical types. This would allow the implementors of the ENUM type (or any higher type) to inform the SchemaProjector class of how it should perform projection. I've opened a ticket here: KAFKA-16257 > Kafka Connect should respect Avro 1.10.X enum defaults spec > --- > > Key: KAFKA-13505 > URL: https://issues.apache.org/jira/browse/KAFKA-13505 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Guus De Graeve >Priority: Major > > We are using Kafka Connect to pipe data from Kafka topics into parquet files > on S3. Our Kafka data is serialised using Avro (schema registry). We use the > Amazon S3 Sink Connector for this. > Up until recently we would set "schema.compatibility" to "NONE" in our > connectors, but this had the pain-full side-effect that during deploys of our > application we got huge file explosions (lots of very small files in HDFS / > S3). This happens because kafka connect will create a new file every time the > schema id of a log changes compared to the previous log. During deploys of > our applications (which can take up to 20 minutes) multiple logs of mixed > schema ids are inevitable and given the huge amounts of logs file explosions > of up to a million files weren't uncommon. > To solve this problem we switched all our connectors "schema.compatibility" > to "BACKWARD", which should only create a new file when a higher schema id is > detected and deserialise all logs with the latest known schema id. Which > should only create one new file during deploys. > An example connector config: > {code:java} > { > "name": "hdfs-Project_Test_Subject", > "config": { > "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", > "partition.duration.ms": "8640", > "topics.dir": "/user/kafka/Project", > "hadoop.conf.dir": "/opt/hadoop/conf", > "flush.size": "100", > "schema.compatibility": "BACKWARD", > "topics": "Project_Test_Subject", > "timezone": "UTC", > "hdfs.url": "hdfs://hadoophost:9000", > "value.converter.value.subject.name.strategy": > "io.confluent.kafka.serializers.subject.TopicNameStrategy", > "rotate.interval.ms": "720", > "locale": "C", > "hadoop.home": "/opt/hadoop", > "logs.dir": "/user/kafka/_logs", > "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", > "partitioner.class": > "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", > "name": "hdfs-Project_Test_Subject", > "errors.tolerance": "all", > "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage", > "path.format": "/MM/dd" > } > }{code} > However, we have lots of enum fields in our data records (avro schemas) to > which subjects get added frequently, and this is causing issues with our > Kafka Connect connectors FAILING with these kinds o
[jira] [Created] (KAFKA-16257) SchemaProjector should be extensible to logical types
Greg Harris created KAFKA-16257: --- Summary: SchemaProjector should be extensible to logical types Key: KAFKA-16257 URL: https://issues.apache.org/jira/browse/KAFKA-16257 Project: Kafka Issue Type: New Feature Components: connect Reporter: Greg Harris The SchemaProjector currently only supports projecting primitive Number types, and cannot handle common logical types as have proliferated in the Connect ecosystem. The SchemaProjector or a replacement should have the ability to extend it's functionality to support these logical types. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16206 Fix unnecessary topic config deletion during ZK migration [kafka]
ahuang98 commented on PR #14206: URL: https://github.com/apache/kafka/pull/14206#issuecomment-1944388740 @mimaison the PR was originally meant to introduce additional test changes, I believe @mumrah renamed it after I added the migration fix. I'll move the tests out and apply your suggestions, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] 16069 - prevent re-transform of source-records after retriable exceptions [kafka]
wasniksudesh closed pull request #15140: 16069 - prevent re-transform of source-records after retriable exceptions URL: https://github.com/apache/kafka/pull/15140 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: update leaderAndEpoch before initializing metadata publishers [kafka]
mumrah commented on code in PR #15366: URL: https://github.com/apache/kafka/pull/15366#discussion_r1489893040 ## metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java: ## @@ -197,13 +209,15 @@ private MetadataLoader( String threadNamePrefix, FaultHandler faultHandler, MetadataLoaderMetrics metrics, -Supplier highWaterMarkAccessor +Supplier highWaterMarkAccessor, +Supplier leaderAndEpochAccessor ) { this.log = logContext.logger(MetadataLoader.class); this.time = time; this.faultHandler = faultHandler; this.metrics = metrics; this.highWaterMarkAccessor = highWaterMarkAccessor; +this.leaderAndEpochAccessor = leaderAndEpochAccessor; Review Comment: Should we call this "initialLeaderAndEpochAccessor" since we only read from it during startup? Otherwise it might be confusing that we have this supplier, but update the `currentLeaderAndEpoch` in a different way -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1489894351 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -336,10 +342,14 @@ public ControllerResult registerBroker( ", but got cluster ID " + request.clusterId()); } int brokerId = request.brokerId(); +List records = new ArrayList<>(); BrokerRegistration existing = brokerRegistrations.get(brokerId); if (version < 2 || existing == null || request.previousBrokerEpoch() != existing.epoch()) { -// TODO(KIP-966): Update the ELR if the broker has an unclean shutdown. -log.debug("Received an unclean shutdown request"); +if (handleBrokerUncleanShutdownHelper == null) { +log.warn("No handleBrokerUncleanShutdownHelper provided"); Review Comment: Thanks for the advice! Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
CalvinConfluent commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1489894070 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -780,4 +789,9 @@ public Entry> next() { } }; } + +@FunctionalInterface +interface HandleBrokerUncleanShutdownHelper { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Draft] KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
RamanVerma commented on PR #14731: URL: https://github.com/apache/kafka/pull/14731#issuecomment-1944354242 > Hey @RamanVerma what is left here? We decided to bump the API version in the KIP. That and some testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [Draft] KIP-994 (Part 1) Minor Enhancements to ListTransactionsRequest [kafka]
jolshan commented on PR #14731: URL: https://github.com/apache/kafka/pull/14731#issuecomment-1944349380 Hey @RamanVerma what is left here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1489860692 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -279,7 +286,7 @@ synchronized Optional partitionMetadataIfCur * @return a mapping from topic names to topic IDs for all topics with valid IDs in the cache */ public synchronized Map topicIds() { Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16256) Update ConsumerConfig to validate use of group.remote.assignor and partition.assignment.strategy based on group.protocol
[ https://issues.apache.org/jira/browse/KAFKA-16256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True reassigned KAFKA-16256: - Assignee: Kirk True > Update ConsumerConfig to validate use of group.remote.assignor and > partition.assignment.strategy based on group.protocol > > > Key: KAFKA-16256 > URL: https://issues.apache.org/jira/browse/KAFKA-16256 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > {{ConsumerConfig}} supports both the {{group.remote.assignor}} and > {{partition.assignment.strategy}} configuration options. These, however, > should not be used together; the former is applicable only when the > {{group.protocol}} is set to {{consumer}} and the latter when the > {{group.protocol}} is set to {{{}classic{}}}. We should emit a warning if the > user specifies the incorrect configuration based on the value of > {{{}group.protocol{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16256) Update ConsumerConfig to validate use of group.remote.assignor and partition.assignment.strategy based on group.protocol
Kirk True created KAFKA-16256: - Summary: Update ConsumerConfig to validate use of group.remote.assignor and partition.assignment.strategy based on group.protocol Key: KAFKA-16256 URL: https://issues.apache.org/jira/browse/KAFKA-16256 Project: Kafka Issue Type: Improvement Components: clients, consumer Affects Versions: 3.7.0 Reporter: Kirk True Fix For: 3.8.0 {{ConsumerConfig}} supports both the {{group.remote.assignor}} and {{partition.assignment.strategy}} configuration options. These, however, should not be used together; the former is applicable only when the {{group.protocol}} is set to {{consumer}} and the latter when the {{group.protocol}} is set to {{{}classic{}}}. We should emit a warning if the user specifies the incorrect configuration based on the value of {{{}group.protocol{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1489832782 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -279,7 +286,7 @@ synchronized Optional partitionMetadataIfCur * @return a mapping from topic names to topic IDs for all topics with valid IDs in the cache */ public synchronized Map topicIds() { Review Comment: That's a good call out, i will check other methods reading from this snapshot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16255) AsyncKafkaConsumer should not use partition.assignment.strategy
[ https://issues.apache.org/jira/browse/KAFKA-16255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16255: -- Description: The {{partition.assignment.strategy}} configuration is used to specify a list of zero or more {{ConsumerPartitionAssignor}} instances. However, that interface is not applicable for the KIP-848-based protocol on top of which {{AsyncKafkaConsumer}} is built. Therefore, the use of {{ConsumerPartitionAssignor}} is inappropriate and should be removed from {{{}AsyncKafkaConsumer{}}}. (was: The partition.assignment.strategy configuration is used to specify a list of zero or more ConsumerPartitionAssignor instances. However, that interface is not applicable for the KIP-848-based protocol on top of which AsyncKafkaConsumer is built. Therefore, the use of ConsumerPartitionAssignor is in appropriate and should be removed.) > AsyncKafkaConsumer should not use partition.assignment.strategy > --- > > Key: KAFKA-16255 > URL: https://issues.apache.org/jira/browse/KAFKA-16255 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > The {{partition.assignment.strategy}} configuration is used to specify a list > of zero or more {{ConsumerPartitionAssignor}} instances. However, that > interface is not applicable for the KIP-848-based protocol on top of which > {{AsyncKafkaConsumer}} is built. Therefore, the use of > {{ConsumerPartitionAssignor}} is inappropriate and should be removed from > {{{}AsyncKafkaConsumer{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16255) AsyncKafkaConsumer should not use partition.assignment.strategy
Kirk True created KAFKA-16255: - Summary: AsyncKafkaConsumer should not use partition.assignment.strategy Key: KAFKA-16255 URL: https://issues.apache.org/jira/browse/KAFKA-16255 Project: Kafka Issue Type: Bug Components: clients, consumer Affects Versions: 3.7.0 Reporter: Kirk True Assignee: Kirk True Fix For: 3.8.0 The partition.assignment.strategy configuration is used to specify a list of zero or more ConsumerPartitionAssignor instances. However, that interface is not applicable for the KIP-848-based protocol on top of which AsyncKafkaConsumer is built. Therefore, the use of ConsumerPartitionAssignor is in appropriate and should be removed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15665: Enforce partition reassignment should complete when all target replicas are in ISR [kafka]
CalvinConfluent commented on PR #15359: URL: https://github.com/apache/kafka/pull/15359#issuecomment-1944225912 testIOExceptionDuringCheckpoint failure is irrelevant to the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16254) Allow MM2 to fully disable offset sync feature
Omnia Ibrahim created KAFKA-16254: - Summary: Allow MM2 to fully disable offset sync feature Key: KAFKA-16254 URL: https://issues.apache.org/jira/browse/KAFKA-16254 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 3.6.0, 3.5.0, 3.7.0 Reporter: Omnia Ibrahim Assignee: Omnia Ibrahim *Background:* At the moment syncing offsets feature in MM2 is broken to 2 parts # One is in `MirrorSourceTask` where we store the new recored's offset on target cluster to {{offset_syncs}} internal topic after mirroring the record. Before KAFKA-14610 in 3.5 MM2 used to just queue the offsets and publish them later but since 3.5 this behaviour changed we now publish any offset syncs that we've queued up, but have not yet been able to publish when `MirrorSourceTask.commit` get invoked. This introduced an over head to commit process. # The second part is in checkpoints source task where we use the new record offsets from {{offset_syncs}} and update {{checkpoints}} and {{__consumer_offsets}} topics. *Problem:* For customers who only use MM2 for mirroring data and not interested in syncing offsets feature they now can disable the second part of this feature which is by disabling {{emit.checkpoints.enabled}} and/or {{sync.group.offsets.enabled}} to disable emitting {{__consumer_offsets}} topic but nothing disabling 1st part of the feature. The problem get worse if they disabled MM2 from creating offset syncs internal topic as 1. this will increase throughput as MM2 will try to force trying to update the offset with every mirrored batch which impacting the performance of our MM2. 2. Get too many error logs because they don't create the sync offset topic as they don't use the feature. *Possible solution:* Allow customers to fully disable the feature if they don't really need it similar to how we fully can disable other MM2 features like heartbeat feature by adding a new config. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15585: DescribeTopicPartitions client side change [kafka]
mumrah commented on code in PR #15265: URL: https://github.com/apache/kafka/pull/15265#discussion_r1489713255 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -2129,63 +2167,183 @@ private Map> handleDescribeTopicsByNames(f } } final long now = time.milliseconds(); -Call call = new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()), -new LeastLoadedNodeProvider()) { -private boolean supportsDisablingTopicCreation = true; +if (options.useDescribeTopicsApi()) { +RecurringCall call = new RecurringCall("DescribeTopics-Recurring", calcDeadlineMs(now, options.timeoutMs()), runnable) { +Map pendingTopics = +topicNames.stream().map(topicName -> new TopicRequest().setName(topicName)) +.collect(Collectors.toMap(topicRequest -> topicRequest.name(), topicRequest -> topicRequest, (t1, t2) -> t1, TreeMap::new) +); -@Override -MetadataRequest.Builder createRequest(int timeoutMs) { -if (supportsDisablingTopicCreation) -return new MetadataRequest.Builder(new MetadataRequestData() - .setTopics(convertToMetadataRequestTopic(topicNamesList)) -.setAllowAutoTopicCreation(false) - .setIncludeTopicAuthorizedOperations(options.includeAuthorizedOperations())); -else -return MetadataRequest.Builder.allTopics(); -} +String partiallyFinishedTopicName = ""; +int partiallyFinishedTopicNextPartitionId = -1; +TopicDescription partiallyFinishedTopicDescription = null; -@Override -void handleResponse(AbstractResponse abstractResponse) { -MetadataResponse response = (MetadataResponse) abstractResponse; -// Handle server responses for particular topics. -Cluster cluster = response.buildCluster(); -Map errors = response.errors(); -for (Map.Entry> entry : topicFutures.entrySet()) { -String topicName = entry.getKey(); -KafkaFutureImpl future = entry.getValue(); -Errors topicError = errors.get(topicName); -if (topicError != null) { -future.completeExceptionally(topicError.exception()); -continue; +@Override +Call generateCall() { +return new Call("describeTopics", this.deadlineMs, new LeastLoadedNodeProvider()) { +@Override +DescribeTopicPartitionsRequest.Builder createRequest(int timeoutMs) { +DescribeTopicPartitionsRequestData request = new DescribeTopicPartitionsRequestData() + .setTopics(pendingTopics.values().stream().collect(Collectors.toList())) + .setResponsePartitionLimit(options.partitionSizeLimitPerResponse()); +if (!partiallyFinishedTopicName.isEmpty()) { +request.setCursor(new DescribeTopicPartitionsRequestData.Cursor() +.setTopicName(partiallyFinishedTopicName) + .setPartitionIndex(partiallyFinishedTopicNextPartitionId) +); +} +return new DescribeTopicPartitionsRequest.Builder(request); +} + +@Override +void handleResponse(AbstractResponse abstractResponse) { +DescribeTopicPartitionsResponse response = (DescribeTopicPartitionsResponse) abstractResponse; +String cursorTopicName = ""; +int cursorPartitionId = -1; +if (response.data().nextCursor() != null) { +DescribeTopicPartitionsResponseData.Cursor cursor = response.data().nextCursor(); +cursorTopicName = cursor.topicName(); +cursorPartitionId = cursor.partitionIndex(); +} + +for (DescribeTopicPartitionsResponseTopic topic : response.data().topics()) { +String topicName = topic.name(); +Errors error = Errors.forCode(topic.errorCode()); + +KafkaFutureImpl future = topicFutures.get(topicName); +if (error != Errors.NONE) { + future.completeExceptionally(error.exception()); +
Re: [PR] KAFKA-15586: Clean shutdown detection - server side [kafka]
mumrah commented on code in PR #14706: URL: https://github.com/apache/kafka/pull/14706#discussion_r1489685922 ## metadata/src/main/java/org/apache/kafka/controller/BrokersToElrs.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.metadata.Replicas; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +import static org.apache.kafka.metadata.Replicas.NONE; + +public class BrokersToElrs { +private final SnapshotRegistry snapshotRegistry; + +// It maps from the broker id to the topic id partitions if the partition has ELR. +private final TimelineHashMap> elrMembers; + +BrokersToElrs(SnapshotRegistry snapshotRegistry) { +this.snapshotRegistry = snapshotRegistry; +this.elrMembers = new TimelineHashMap<>(snapshotRegistry, 0); +} + +/** + * Update our records of a partition's ELR. + * + * @param topicId The topic ID of the partition. + * @param partitionId The partition ID of the partition. + * @param prevElr The previous ELR, or null if the partition is new. + * @param nextElr The new ELR, or null if the partition is being removed. + */ + +void update(Uuid topicId, int partitionId, int[] prevElr, int[] nextElr) { +int[] prev; +if (prevElr == null) { +prev = NONE; +} else { +prev = Replicas.clone(prevElr); +Arrays.sort(prev); +} +int[] next; +if (nextElr == null) { +next = NONE; +} else { +next = Replicas.clone(nextElr); +Arrays.sort(next); +} + +int i = 0, j = 0; +while (true) { +if (i == prev.length) { +if (j == next.length) { +break; +} +int newReplica = next[j]; +add(newReplica, topicId, partitionId); +j++; +} else if (j == next.length) { +int prevReplica = prev[i]; +remove(prevReplica, topicId, partitionId); +i++; +} else { +int prevReplica = prev[i]; +int newReplica = next[j]; +if (prevReplica < newReplica) { +remove(prevReplica, topicId, partitionId); +i++; +} else if (prevReplica > newReplica) { +add(newReplica, topicId, partitionId); +j++; +} else { +i++; +j++; +} +} +} +} + +void removeTopicEntryForBroker(Uuid topicId, int brokerId) { +Map topicMap = elrMembers.get(brokerId); +if (topicMap != null) { +topicMap.remove(topicId); +} +} + +private void add(int brokerId, Uuid topicId, int newPartition) { +TimelineHashMap topicMap = elrMembers.get(brokerId); +if (topicMap == null) { +topicMap = new TimelineHashMap<>(snapshotRegistry, 0); +elrMembers.put(brokerId, topicMap); +} +int[] partitions = topicMap.get(topicId); +int[] newPartitions; +if (partitions == null) { +newPartitions = new int[1]; +} else { +newPartitions = new int[partitions.length + 1]; +System.arraycopy(partitions, 0, newPartitions, 0, partitions.length); +} +newPartitions[newPartitions.length - 1] = newPartition; +topicMap.put(topicId, newPartitions); +} + +private void remove(int brokerId, Uuid topicId, int removedPartition) { +TimelineHashMap topicMap = elrMembers.get(brokerId); +if (topicMap == null) { +throw new RuntimeException("Broker " + brokerId + " has no elrMembers " + +"entry, so we can't remove " + topicId + ":" + removedPartition); +} +int[] partitions = topicMap.get(topicId); +
[jira] [Commented] (KAFKA-16212) Cache partitions by TopicIdPartition instead of TopicPartition
[ https://issues.apache.org/jira/browse/KAFKA-16212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817425#comment-17817425 ] Omnia Ibrahim commented on KAFKA-16212: --- Actually there is another proposal 4# which is to wait till we drop ZK before handling this Jira this will get us to simply replace TopicPartition by TopicIdPrtition as no were in the code we will have situation where topic id might be none. > Cache partitions by TopicIdPartition instead of TopicPartition > -- > > Key: KAFKA-16212 > URL: https://issues.apache.org/jira/browse/KAFKA-16212 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.7.0 >Reporter: Gaurav Narula >Assignee: Omnia Ibrahim >Priority: Major > > From the discussion in [PR > 15263|https://github.com/apache/kafka/pull/15263#discussion_r1471075201], it > would be better to cache {{allPartitions}} by {{TopicIdPartition}} instead of > {{TopicPartition}} to avoid ambiguity. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16206 Fix unnecessary topic config deletion during ZK migration [kafka]
mimaison commented on code in PR #14206: URL: https://github.com/apache/kafka/pull/14206#discussion_r1489636848 ## core/src/main/scala/kafka/zk/migration/ZkConfigMigrationClient.scala: ## @@ -226,8 +226,8 @@ class ZkConfigMigrationClient( val (migrationZkVersion, responses) = zkClient.retryMigrationRequestsUntilConnected(requests, state) if (responses.head.resultCode.equals(Code.NONODE)) { -// Not fatal. -error(s"Did not delete $configResource since the node did not exist.") +// Not fatal. This is expected in the case this is a topic config and we delete the topic (KAFKA-16206) Review Comment: We usually don't include mention to Jiras. The explanation seems sufficient. ## metadata/src/test/java/org/apache/kafka/image/ClientQuotasImageTest.java: ## @@ -17,6 +17,7 @@ package org.apache.kafka.image; +import java.util.Collections; Review Comment: Can we move this with the other `java.util` imports? ## metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java: ## @@ -64,6 +71,12 @@ public class ClusterImageTest { final static ClusterImage IMAGE2; +static final List DELTA2_RECORDS; + +static final ClusterDelta DELTA2; + +final static ClusterImage IMAGE3; Review Comment: nit: `static final` is the preferred syntax ## metadata/src/test/java/org/apache/kafka/image/ClusterImageTest.java: ## @@ -17,6 +17,11 @@ package org.apache.kafka.image; +import org.apache.kafka.common.metadata.RegisterBrokerRecord; Review Comment: Can we move these imports with the other `org.apache.kafka.common.metadata` imports below? ## metadata/src/test/java/org/apache/kafka/image/ConfigurationsImageTest.java: ## @@ -17,6 +17,7 @@ package org.apache.kafka.image; +import java.util.Collections; Review Comment: Let's move this below -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589: [2/3] Tests of ConsoleGroupCommand rewritten in java [kafka]
nizhikov commented on PR #15363: URL: https://github.com/apache/kafka/pull/15363#issuecomment-1944040321 Hello @showuon Can you, please, take a look? I've checked CI results and it seems OK for me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14589: [3/3] Tests of ConsoleGroupCommand rewritten in java [kafka]
nizhikov commented on PR #15365: URL: https://github.com/apache/kafka/pull/15365#issuecomment-1944038174 Hello @showuon Can you, please, take a look? I've checked CI results and it seems OK for me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16157: fix topic recreation handling with offline disks [kafka]
ijuma commented on code in PR #15263: URL: https://github.com/apache/kafka/pull/15263#discussion_r1489643087 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -874,7 +874,13 @@ class Partition(val topicPartition: TopicPartition, private def createLogInAssignedDirectoryId(partitionState: LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = { targetLogDirectoryId match { case Some(directoryId) => -createLogIfNotExists(directoryId == DirectoryId.UNASSIGNED, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +if (logManager.onlineLogDirId(directoryId) || !logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) { + createLogIfNotExists(partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints, topicId, targetLogDirectoryId) +} else { + warn(s"Skipping creation of log because there are potentially offline log " + Review Comment: What is the operator supposed to do when they see this warning? Generally, we should be very careful about warning logs - most times they are an anti-pattern as they are scary but not actionable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16155: Re-enable testAutoCommitIntercept [kafka]
lucasbru merged PR #15334: URL: https://github.com/apache/kafka/pull/15334 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16212) Cache partitions by TopicIdPartition instead of TopicPartition
[ https://issues.apache.org/jira/browse/KAFKA-16212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817412#comment-17817412 ] Omnia Ibrahim commented on KAFKA-16212: --- The moment the cache in `ReplicaManager.allPartitions` is represented as `Pool[TopicPartition, HostedPartition]` which is a wrapper around `ConcurrentHashMap[opicPartition, HostedPartition]` update this to use `TopicIdPartition` as a key turn out to be tricky as # not all APIs that interact with `ReplicaManager` in order to fetch/update partition cache are aware of topicId like consumer coordinator, handling some requests in KafkaApi where the request schema doesn't have topicId, etc . # TopicId is represented as Optional in many places which means we might endup populate it with null or dummy uuid multiple times to construct TopicIdPartition. I have 3 proposals at the moment: * *Proposal #1 :* Update TopicIdPartitions to have constructor with topicId as optional. And change `ReplicaManager.allPartitions` to be `LinkedBlockingQueue[TopicIdPartition, HostedPartition]`. _*This might be the simplest one as far as I can see.*_ ** any API that is not topic id aware will just get the last entry that match topicIdPartition.topicPartition. ** The code will need to make sure that we don't have duplicates by `TopicIdPartition` in the `LinkedBlockingQueue`. ** We will need to revert having topic Id as optional in TopicIdPartitions once everywhere in Kafka is topic-id aware. * *Proposal #2 :* change `ReplicaManager.allPartitions` to `new Pool[TopicPartition, LinkedBlockingQueue[(Option[Uuid], HostedPartition)]]` where `Option[Uuid]` represent topic id. This make the cache scheme bit complex. The proposal will ** consider the last entry in `LinkedBlockingQueue` is the current value. ** The code will make sure that `LinkedBlockingQueue` has only entry for the same topic id ** Topic Id aware APIs that need to fetch/update the partition will be updated to use `TopicPartition` and topic Id ** Topic Id non-aware APIs will remain using topic partitions and the replicaManager will assume that these APIs referring to the last entry in `LinkedBlockingQueue` * *Proposal#3:* The other option is to keep two separate caches one `Pool[TopicIdPartition, HostedPartition]` for partitions and another one `Pool[TopicPartition, Uuid]` for the last assigned topic id for each partition in order to form `TopicIdPartition`. This is the least favourite as having 2 caches will risk that one of them can go out of data at any time. [~jolshan] Do you have any strong preferences? I am leaning toward 1st as it is less messy than the others. WDYT? > Cache partitions by TopicIdPartition instead of TopicPartition > -- > > Key: KAFKA-16212 > URL: https://issues.apache.org/jira/browse/KAFKA-16212 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.7.0 >Reporter: Gaurav Narula >Assignee: Omnia Ibrahim >Priority: Major > > From the discussion in [PR > 15263|https://github.com/apache/kafka/pull/15263#discussion_r1471075201], it > would be better to cache {{allPartitions}} by {{TopicIdPartition}} instead of > {{TopicPartition}} to avoid ambiguity. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
ijuma commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1489615650 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -279,7 +286,7 @@ synchronized Optional partitionMetadataIfCur * @return a mapping from topic names to topic IDs for all topics with valid IDs in the cache */ public synchronized Map topicIds() { Review Comment: Why is this synchronized? We should avoid synchronized for methods that are simply reading a snapshot. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
ijuma commented on code in PR #15323: URL: https://github.com/apache/kafka/pull/15323#discussion_r1489615650 ## clients/src/main/java/org/apache/kafka/clients/Metadata.java: ## @@ -279,7 +286,7 @@ synchronized Optional partitionMetadataIfCur * @return a mapping from topic names to topic IDs for all topics with valid IDs in the cache */ public synchronized Map topicIds() { Review Comment: Why is this synchronized? We should avoid synchronized for methods that are simply reading a snapshot. Also, are there other methods like this one that don't need to be synchronized? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16243: Make sure that we do not exceed max poll interval inside poll [kafka]
lucasbru opened a new pull request, #15372: URL: https://github.com/apache/kafka/pull/15372 The consumer keeps a poll timer, which is used to ensure liveness of the application thread. The poll timer automatically updates while the `Consumer.poll(Duration)` method is blocked, while the newer consumer only updates the poll timer when a new call to `Consumer.poll(Duration)` is issued. This means that the kafka-console-consumer.sh tools, which uses a very long timeout by default, works differently with the new consumer, with the consumer proactively rejoining the group during long poll timeouts. This change solves the problem by (a) repeatedly sending `PollApplicationEvents` to the background thread, not just on the first call of `poll` and (b) making sure that the application thread doesn't block for so long that it runs out of `max.poll.interval`. An integration test is added to make sure that we do not rejoin the group when a long poll timeout is used with a low `max.poll.interval`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup log.dirs in ReplicaManagerTest on JVM exit [kafka]
soarez commented on code in PR #15289: URL: https://github.com/apache/kafka/pull/15289#discussion_r1489605994 ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -137,7 +137,18 @@ object TestUtils extends Logging { val parentFile = new File(parent) parentFile.mkdirs() -JTestUtils.tempDirectory(parentFile.toPath, null) +val result = JTestUtils.tempDirectory(parentFile.toPath, null) + +parentFile.deleteOnExit() +Exit.addShutdownHook("delete-temp-log-dir-on-exit", { + try { +Utils.delete(parentFile) Review Comment: Do we need both `parentFile.deleteOnExit()` and `Utils.delete(parentFile)` on the shutdown hook? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]
OmniaGM commented on code in PR #15354: URL: https://github.com/apache/kafka/pull/15354#discussion_r1489575856 ## core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala: ## @@ -168,6 +169,10 @@ class LogDirFailureTest extends IntegrationTestHarness { } def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType, quorum: String): Unit = { +if (isKRaftTest()) { + val value = configs.map(c => c.brokerId -> c.logDirs.contains(c.metadataLogDir)) + logger.warn(s">> ${value.mkString(",")}") +} Review Comment: https://github.com/apache/kafka/pull/15371 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINIOR: Remove accidentally logs [kafka]
OmniaGM commented on PR #15371: URL: https://github.com/apache/kafka/pull/15371#issuecomment-1943925566 cc: @jolshan can you merge this when you have a moment please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINIOR: Remove accidentally logs [kafka]
OmniaGM opened a new pull request, #15371: URL: https://github.com/apache/kafka/pull/15371 This logs was added during the troubleshooting for some flaky tests and was pushed by mistake in #15354 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]
OmniaGM commented on code in PR #15354: URL: https://github.com/apache/kafka/pull/15354#discussion_r1489567388 ## core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala: ## @@ -168,6 +169,10 @@ class LogDirFailureTest extends IntegrationTestHarness { } def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType, quorum: String): Unit = { +if (isKRaftTest()) { + val value = configs.map(c => c.brokerId -> c.logDirs.contains(c.metadataLogDir)) + logger.warn(s">> ${value.mkString(",")}") +} Review Comment: will raise a minor pr to remove it. I removed it but didn't push the commit ops -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]
OmniaGM commented on code in PR #15354: URL: https://github.com/apache/kafka/pull/15354#discussion_r1489565890 ## core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala: ## @@ -168,6 +169,10 @@ class LogDirFailureTest extends IntegrationTestHarness { } def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType, quorum: String): Unit = { +if (isKRaftTest()) { + val value = configs.map(c => c.brokerId -> c.logDirs.contains(c.metadataLogDir)) + logger.warn(s">> ${value.mkString(",")}") +} Review Comment: I thought I removed this one! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16225: Set `metadata.log.dir` to broker in KRAFT mode in integration test [kafka]
soarez commented on code in PR #15354: URL: https://github.com/apache/kafka/pull/15354#discussion_r1489560264 ## core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala: ## @@ -168,6 +169,10 @@ class LogDirFailureTest extends IntegrationTestHarness { } def testProduceAfterLogDirFailureOnLeader(failureType: LogDirFailureType, quorum: String): Unit = { +if (isKRaftTest()) { + val value = configs.map(c => c.brokerId -> c.logDirs.contains(c.metadataLogDir)) + logger.warn(s">> ${value.mkString(",")}") +} Review Comment: Did you mean to include this change? The logging format and lack of description seems odd. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Fix KafkaAdminClientTest.testClientInstanceId [kafka]
dajac opened a new pull request, #15370: URL: https://github.com/apache/kafka/pull/15370 This patch tries to address the [flakiness](https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.timeZoneId=Europe%2FZurich&tests.container=org.apache.kafka.clients.admin.KafkaAdminClientTest&tests.sortField=FLAKY&tests.test=testClientInstanceId()) of `KafkaAdminClientTest.testClientInstanceId`. The test fails with `org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.`. I believe that it is because 10ms is not enough when our CI is busy. Let's try with 1s. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13505) Kafka Connect should respect Avro 1.10.X enum defaults spec
[ https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17817385#comment-17817385 ] Tommy Becker commented on KAFKA-13505: -- I'm wondering if this bug is being overlooked because it's described in terms of third party libraries like Confluent's S3 connector and Avro. But the issue is actually in the SchemaProjector class, which is part of Kafka Connect itself. Confluent's AvroConverter represents Avro enums in Connect Schema as Strings with "parameters" corresponding to each enum value. Even though these parameters seem like they are just intended to be metadata, SchemaProjector.checkMaybeCompatible() requires the parameters of each field to *exactly* match, so when values are added to an Avro enum, new parameters are added to the generated Connect Schema, breaking this test. Intuitively, it feels like this check could be less strict, but there is no specification for Connect Schema that I can find, so I don't know if requiring this parameter equality is correct or not. > Kafka Connect should respect Avro 1.10.X enum defaults spec > --- > > Key: KAFKA-13505 > URL: https://issues.apache.org/jira/browse/KAFKA-13505 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Guus De Graeve >Priority: Major > > We are using Kafka Connect to pipe data from Kafka topics into parquet files > on S3. Our Kafka data is serialised using Avro (schema registry). We use the > Amazon S3 Sink Connector for this. > Up until recently we would set "schema.compatibility" to "NONE" in our > connectors, but this had the pain-full side-effect that during deploys of our > application we got huge file explosions (lots of very small files in HDFS / > S3). This happens because kafka connect will create a new file every time the > schema id of a log changes compared to the previous log. During deploys of > our applications (which can take up to 20 minutes) multiple logs of mixed > schema ids are inevitable and given the huge amounts of logs file explosions > of up to a million files weren't uncommon. > To solve this problem we switched all our connectors "schema.compatibility" > to "BACKWARD", which should only create a new file when a higher schema id is > detected and deserialise all logs with the latest known schema id. Which > should only create one new file during deploys. > An example connector config: > {code:java} > { > "name": "hdfs-Project_Test_Subject", > "config": { > "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", > "partition.duration.ms": "8640", > "topics.dir": "/user/kafka/Project", > "hadoop.conf.dir": "/opt/hadoop/conf", > "flush.size": "100", > "schema.compatibility": "BACKWARD", > "topics": "Project_Test_Subject", > "timezone": "UTC", > "hdfs.url": "hdfs://hadoophost:9000", > "value.converter.value.subject.name.strategy": > "io.confluent.kafka.serializers.subject.TopicNameStrategy", > "rotate.interval.ms": "720", > "locale": "C", > "hadoop.home": "/opt/hadoop", > "logs.dir": "/user/kafka/_logs", > "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat", > "partitioner.class": > "io.confluent.connect.storage.partitioner.TimeBasedPartitioner", > "name": "hdfs-Project_Test_Subject", > "errors.tolerance": "all", > "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage", > "path.format": "/MM/dd" > } > }{code} > However, we have lots of enum fields in our data records (avro schemas) to > which subjects get added frequently, and this is causing issues with our > Kafka Connect connectors FAILING with these kinds of errors: > {code:java} > Schema parameters not equal. source parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2} and target parameters: > {io.confluent.connect.avro.enum.default.testfield=null, > io.confluent.connect.avro.Enum=Ablo.testfield, > io.confluent.connect.avro.Enum.null=null, > io.confluent.connect.avro.Enum.value1=value1, > io.confluent.connect.avro.Enum.value2=value2, > io.confluent.connect.avro.Enum.value3=value3}{code} > Since Avro 1.10.X specification, enum values support defaults, which makes > schema evolution possible even when adding subjects (values) to an enum. When > testing our schemas for compatibility using the Schema Registry api we always > get "is_compatible" => true. So schema evolution should in theory not be a > problem. > The error above
[jira] [Updated] (KAFKA-16224) Fix handling of deleted topic when auto-committing before revocation
[ https://issues.apache.org/jira/browse/KAFKA-16224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16224: --- Labels: client-transitions-issues kip-848-client-support (was: kip-848-client-support) > Fix handling of deleted topic when auto-committing before revocation > > > Key: KAFKA-16224 > URL: https://issues.apache.org/jira/browse/KAFKA-16224 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: client-transitions-issues, kip-848-client-support > > Current logic for auto-committing offsets when partitions are revoked will > retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the > member not completing the revocation in time. We should consider this as an > indication of the topic being deleted, and in the context of committing > offsets to revoke partitions, we should abort the commit attempt and move on > to complete and ack the revocation (effectively considering > UnknownTopicOrPartitionException as non-retriable in this context) > Note that legacy coordinator behaviour around this seems to be the same as > the new consumer currently has. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16155: Re-enable testAutoCommitIntercept [kafka]
cadonna commented on code in PR #15334: URL: https://github.com/apache/kafka/pull/15334#discussion_r1489442547 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -1378,6 +1377,14 @@ class PlaintextConsumerTest extends BaseConsumerTest { // after rebalancing, we should have reset to the committed positions assertEquals(10, testConsumer.committed(Set(tp).asJava).get(tp).offset) assertEquals(20, testConsumer.committed(Set(tp2).asJava).get(tp2).offset) + +// In both CLASSIC and CONSUMER protocols, interceptors are executed in poll and close. +// However, in the CONSUMER protocol, the assignment may be changed outside of a poll, so +// we need to poll once to ensure the interceptor is called. +if (groupProtocol.toUpperCase == GroupProtocol.CONSUMER.name) { Review Comment: I fine either way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-8977: Remove MockStreamsMetrics since it is not a mock [kafka]
cadonna commented on PR #13931: URL: https://github.com/apache/kafka/pull/13931#issuecomment-1943728460 @joobisb Could you please fix the compilation errors? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16247) replica keep out-of-sync after migrating broker to KRaft
[ https://issues.apache.org/jira/browse/KAFKA-16247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16247. --- Resolution: Fixed Fixed in 3.7.0 RC4 > replica keep out-of-sync after migrating broker to KRaft > > > Key: KAFKA-16247 > URL: https://issues.apache.org/jira/browse/KAFKA-16247 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Luke Chen >Priority: Major > Attachments: KAFKA-16247.zip > > > We are deploying 3 controllers and 3 brokers, and following the steps in > [doc|https://kafka.apache.org/documentation/#kraft_zk_migration]. When we're > moving from "Enabling the migration on the brokers" state to "Migrating > brokers to KRaft" state, the first rolled broker becomes out-of-sync and > never become in-sync. > From the log, we can see some "reject alterPartition" errors, but it just > happen 2 times. Theoretically, the leader should add the follower into ISR > as long as the follower is fetching since we don't have client writing data. > But can't figure out why it didn't fetch. > Logs: https://gist.github.com/showuon/64c4dcecb238a317bdbdec8db17fd494 > === > update Feb. 14 > After further investigating the logs, I think the reason why the replica is > not added into ISR is because the alterPartition request got non-retriable > error from controller: > {code:java} > Failed to alter partition to PendingExpandIsr(newInSyncReplicaId=0, > sentLeaderAndIsr=LeaderAndIsr(leader=1, leaderEpoch=4, > isrWithBrokerEpoch=List(BrokerState(brokerId=1, brokerEpoch=-1), > BrokerState(brokerId=2, brokerEpoch=-1), BrokerState(brokerId=0, > brokerEpoch=-1)), leaderRecoveryState=RECOVERED, partitionEpoch=7), > leaderRecoveryState=RECOVERED, > lastCommittedState=CommittedPartitionState(isr=Set(1, 2), > leaderRecoveryState=RECOVERED)) because the partition epoch is invalid. > Partition state may be out of sync, awaiting new the latest metadata. > (kafka.cluster.Partition) > [zk-broker-1-to-controller-alter-partition-channel-manager] > {code} > Since it's a non-retriable error, we'll keep the state as pending, and > waiting for later leaderAndISR update as described > [here|https://github.com/apache/kafka/blob/d24abe0edebad37e554adea47408c3063037f744/core/src/main/scala/kafka/cluster/Partition.scala#L1876C1-L1876C41]. > Log analysis: https://gist.github.com/showuon/5514cbb995fc2ae6acd5858f69c137bb > So the question becomes: > 1. Why does the controller increase the partition epoch? > 2. When the leader receives the leaderAndISR request from the controller, it > ignored the request because the leader epoch is identical, even though the > partition epoch is updated. Is the behavior expected? Will it impact the > alterPartition request later? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16061) KRaft JBOD follow-ups and improvements
[ https://issues.apache.org/jira/browse/KAFKA-16061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez reassigned KAFKA-16061: --- Assignee: Igor Soarez > KRaft JBOD follow-ups and improvements > -- > > Key: KAFKA-16061 > URL: https://issues.apache.org/jira/browse/KAFKA-16061 > Project: Kafka > Issue Type: Improvement >Reporter: Colin McCabe >Assignee: Igor Soarez >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16226 Reduce synchronization between producer threads [kafka]
msn-tldr commented on PR #15323: URL: https://github.com/apache/kafka/pull/15323#issuecomment-1943675980 Went through the test failures across all jdk/scala combos, they are unrelated, and have been failing before this PR as well https://ge.apache.org/s/wftzjb3q6slyc/tests/overview?outcome=FAILED https://ge.apache.org/s/lyqs6eqs4mtny/tests/overview?outcome=FAILED https://ge.apache.org/s/x6x27oapk6qsa/tests/overview?outcome=FAILED https://ge.apache.org/s/sleegbh5pfyfo/tests/overview?outcome=FAILED The test failure belong to test `kafka.server.LogDirFailureTest`, they are being fixed here https://issues.apache.org/jira/browse/KAFKA-16225 @hachikuji I believe this good to be merged, what do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org