[jira] [Resolved] (KAFKA-15623) Migrate remaining tests in streams module to JUnit 5
[ https://issues.apache.org/jira/browse/KAFKA-15623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-15623. Fix Version/s: 3.9.0 Resolution: Fixed > Migrate remaining tests in streams module to JUnit 5 > > > Key: KAFKA-15623 > URL: https://issues.apache.org/jira/browse/KAFKA-15623 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: 黃竣陽 >Priority: Major > Fix For: 3.9.0 > > > The following special case from `build.gradle` can be removed once this is > completed: > {code:java} > if (project.name == 'streams') { > useJUnitPlatform { >includeTags "integration" >includeTags "org.apache.kafka.test.IntegrationTest" > // Both engines are needed to run JUnit 4 tests alongside JUnit 5 tests. >// junit-vintage (JUnit 4) can be removed once the JUnit 4 migration > is complete. >includeEngines "junit-vintage", "junit-jupiter" > } >} {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-7339) Migrate from JUnit 4 to JUnit 5
[ https://issues.apache.org/jira/browse/KAFKA-7339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-7339. --- Fix Version/s: 3.9.0 Resolution: Fixed > Migrate from JUnit 4 to JUnit 5 > --- > > Key: KAFKA-7339 > URL: https://issues.apache.org/jira/browse/KAFKA-7339 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 3.9.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16228) Add remote log metadata flag to the dump log tool
[ https://issues.apache.org/jira/browse/KAFKA-16228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri updated KAFKA-16228: Fix Version/s: 3.9.0 > Add remote log metadata flag to the dump log tool > - > > Key: KAFKA-16228 > URL: https://issues.apache.org/jira/browse/KAFKA-16228 > Project: Kafka > Issue Type: New Feature > Components: Tiered-Storage >Affects Versions: 3.6.1 >Reporter: Federico Valeri >Assignee: Federico Valeri >Priority: Major > Labels: kip-required > Fix For: 3.9.0 > > > It would be good to improve the kafka-dump-log.sh tool adding a decode flag > for __remote_log_metadata records. Something like the following would be > useful for debugging. > {code} > bin/kafka-dump-log.sh --remote-log-metadata-decoder --files > /opt/kafka/data/__remote_log_metadata-0/.log > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17045) Move MetadataLogConfig from kafka to kafka.raft
XiDuo You created KAFKA-17045: - Summary: Move MetadataLogConfig from kafka to kafka.raft Key: KAFKA-17045 URL: https://issues.apache.org/jira/browse/KAFKA-17045 Project: Kafka Issue Type: Improvement Reporter: XiDuo You The MetadataLogConfig belongs to raft, move file to raft to mach the package name. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17046) Upgrade netty version to 4.1.111.Final
XiDuo You created KAFKA-17046: - Summary: Upgrade netty version to 4.1.111.Final Key: KAFKA-17046 URL: https://issues.apache.org/jira/browse/KAFKA-17046 Project: Kafka Issue Type: Improvement Reporter: XiDuo You -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-7342) Migrate streams modules to JUnit 5
[ https://issues.apache.org/jira/browse/KAFKA-7342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860389#comment-17860389 ] Mickael Maison commented on KAFKA-7342: --- It looks like this has been done in https://issues.apache.org/jira/browse/KAFKA-15623. Marking as duplicate and closing. > Migrate streams modules to JUnit 5 > -- > > Key: KAFKA-7342 > URL: https://issues.apache.org/jira/browse/KAFKA-7342 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Ismael Juma >Assignee: Christo Lolov >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-7342) Migrate streams modules to JUnit 5
[ https://issues.apache.org/jira/browse/KAFKA-7342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-7342. --- Resolution: Duplicate > Migrate streams modules to JUnit 5 > -- > > Key: KAFKA-7342 > URL: https://issues.apache.org/jira/browse/KAFKA-7342 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Ismael Juma >Assignee: Christo Lolov >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14109) Clean up JUnit 4 test infrastructure
[ https://issues.apache.org/jira/browse/KAFKA-14109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-14109. Resolution: Duplicate > Clean up JUnit 4 test infrastructure > > > Key: KAFKA-14109 > URL: https://issues.apache.org/jira/browse/KAFKA-14109 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > We need to cleanup the setup in > https://issues.apache.org/jira/browse/KAFKA-14108 once the JUnit 4 to JUnit 5 > migration is complete. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14218) replace temp file handler with JUnit 5 Temporary Directory Support
[ https://issues.apache.org/jira/browse/KAFKA-14218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860390#comment-17860390 ] Mickael Maison commented on KAFKA-14218: [~stillya] All tests have been updated to JUnit 5 now. Can you update your PR? Thanks > replace temp file handler with JUnit 5 Temporary Directory Support > -- > > Key: KAFKA-14218 > URL: https://issues.apache.org/jira/browse/KAFKA-14218 > Project: Kafka > Issue Type: Improvement > Components: unit tests >Reporter: Luke Chen >Assignee: Ganesh Sahu >Priority: Major > Labels: Newbie, newbie > > We created many temp files in tests, and sometimes we forgot to delete them > after usage. Instead of polluting @AfterEach for each test, we should > consider to use JUnit 5 TempDirectory Extension. > > REF: 1. [https://github.com/apache/kafka/pull/12591#issuecomment-1243001431] > 2. [https://www.baeldung.com/junit-5-temporary-directory] > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yazgoo updated KAFKA-10413: --- Attachment: rebalance.sh > rebalancing leads to unevenly balanced connectors > - > > Key: KAFKA-10413 > URL: https://issues.apache.org/jira/browse/KAFKA-10413 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 2.5.1 >Reporter: yazgoo >Assignee: rameshkrishnan muthusamy >Priority: Major > Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2 > > Attachments: connect_worker_balanced.png, rebalance.sh > > > GHi, > With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, > if a connect instance disappear, or a new one appear, we're seeing unbalanced > consumption, much like mentionned in this post: > [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors] > This usually leads to one kafka connect instance taking most of the load and > consumption not being able to keep on. > Currently, we're "fixing" this by deleting the connector and re-creating it, > but this is far from ideal. > Any suggestion on what we could do to mitigate this ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860408#comment-17860408 ] yazgoo commented on KAFKA-10413: Hello, I launch the attached script : [^rebalance.sh] And in one of my test I get onne connect well balanced {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 15 "worker_id": "k2:8082" 15 "worker_id": "k3:8083" 15 "worker_id": "k4:8084" 15 "worker_id": "k5:8085" 15 "worker_id": "k6:8086" 15 "worker_id": "k7:8087" 15 "worker_id": "k8:8088" 15 "worker_id": "k9:8089" {code} And the other one unbalanced {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 27 "worker_id": "k1:8081" 11 "worker_id": "k2:8082" 12 "worker_id": "k3:8083" 11 "worker_id": "k4:8084" 12 "worker_id": "k5:8085" 12 "worker_id": "k6:8086" 11 "worker_id": "k7:8087" 12 "worker_id": "k8:8088" 12 "worker_id": "k9:8089" {code} Regards > rebalancing leads to unevenly balanced connectors > - > > Key: KAFKA-10413 > URL: https://issues.apache.org/jira/browse/KAFKA-10413 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 2.5.1 >Reporter: yazgoo >Assignee: rameshkrishnan muthusamy >Priority: Major > Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2 > > Attachments: connect_worker_balanced.png, rebalance.sh > > > GHi, > With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, > if a connect instance disappear, or a new one appear, we're seeing unbalanced > consumption, much like mentionned in this post: > [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors] > This usually leads to one kafka connect instance taking most of the load and > consumption not being able to keep on. > Currently, we're "fixing" this by deleting the connector and re-creating it, > but this is far from ideal. > Any suggestion on what we could do to mitigate this ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16754) Implement release acquired records functionality in SharePartition
[ https://issues.apache.org/jira/browse/KAFKA-16754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Abhinav Dixit resolved KAFKA-16754. --- Fix Version/s: 4.0.0 3.9.0 Resolution: Fixed > Implement release acquired records functionality in SharePartition > -- > > Key: KAFKA-16754 > URL: https://issues.apache.org/jira/browse/KAFKA-16754 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Abhinav Dixit >Priority: Major > Fix For: 4.0.0, 3.9.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16822) Abstract consumer group in coordinator to share functionality with share group
[ https://issues.apache.org/jira/browse/KAFKA-16822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-16822. - Fix Version/s: 3.9.0 Resolution: Fixed > Abstract consumer group in coordinator to share functionality with share group > -- > > Key: KAFKA-16822 > URL: https://issues.apache.org/jira/browse/KAFKA-16822 > Project: Kafka > Issue Type: Sub-task >Reporter: Apoorv Mittal >Assignee: Apoorv Mittal >Priority: Major > Fix For: 3.9.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17046) Upgrade netty version to 4.1.111.Final
[ https://issues.apache.org/jira/browse/KAFKA-17046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860474#comment-17860474 ] 黃竣陽 commented on KAFKA-17046: - Im intersting in this issue, Could you assign to me > Upgrade netty version to 4.1.111.Final > -- > > Key: KAFKA-17046 > URL: https://issues.apache.org/jira/browse/KAFKA-17046 > Project: Kafka > Issue Type: Improvement >Reporter: XiDuo You >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-12911) Configure automatic formatter for org.apache.kafka.streams.processor
[ https://issues.apache.org/jira/browse/KAFKA-12911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-12911. Resolution: Duplicate > Configure automatic formatter for org.apache.kafka.streams.processor > > > Key: KAFKA-12911 > URL: https://issues.apache.org/jira/browse/KAFKA-12911 > Project: Kafka > Issue Type: Sub-task >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Major > > As an incremental approach to introduce automatic code formatter, we will > configure automatic formatter for org.apache.kafka.streams.processor package > with 127 (main) + 78 (test) = 205 files. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-12910) Configure automatic formatter for org.apache.kafka.streams.state
[ https://issues.apache.org/jira/browse/KAFKA-12910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-12910. Resolution: Duplicate > Configure automatic formatter for org.apache.kafka.streams.state > > > Key: KAFKA-12910 > URL: https://issues.apache.org/jira/browse/KAFKA-12910 > Project: Kafka > Issue Type: Sub-task >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Major > > As of 48379bd6e5, there are 893 java files in streams module. > As an incremental approach to introduce automatic code formatter, we will > configure automatic formatter for org.apache.kafka.streams.processor package > with 147 (main) + 91 (test) = 238 files -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-10787) Introduce an import order in Java sources
[ https://issues.apache.org/jira/browse/KAFKA-10787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-10787. Resolution: Fixed > Introduce an import order in Java sources > - > > Key: KAFKA-10787 > URL: https://issues.apache.org/jira/browse/KAFKA-10787 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.8.0 >Reporter: Dongjin Lee >Assignee: xuanzhang gong >Priority: Major > Fix For: 3.9.0 > > > As of present, Kafka uses a relatively strict code style for Java code, > except import order. For this reason, the code formatting settings of every > local dev environment are different from person to person, resulting in > countless meaningless import order changes in the PR. > This issue aims to define and apply a 3-group import order, like the > following: > 1. Project packages: {{kafka.*}}, {{org.apache.kafka.*}} > 2. Third Party packages: {{com.*}}, {{net.*}}, {{org.*}} > 3. Java packages: {{java.*}}, {{javax.*}} > Discussion Thread: > https://lists.apache.org/thread.html/rf6f49c845a3d48efe8a91916c8fbaddb76da17742eef06798fc5b24d%40%3Cdev.kafka.apache.org%3E -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-17046) Upgrade netty version to 4.1.111.Final
[ https://issues.apache.org/jira/browse/KAFKA-17046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860474#comment-17860474 ] 黃竣陽 edited comment on KAFKA-17046 at 6/27/24 12:53 PM: --- Im intersting in this issue, Could you assign to me? was (Author: JIRAUSER305187): Im intersting in this issue, Could you assign to me > Upgrade netty version to 4.1.111.Final > -- > > Key: KAFKA-17046 > URL: https://issues.apache.org/jira/browse/KAFKA-17046 > Project: Kafka > Issue Type: Improvement >Reporter: XiDuo You >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17046) Upgrade netty version to 4.1.111.Final
[ https://issues.apache.org/jira/browse/KAFKA-17046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860476#comment-17860476 ] Chia-Ping Tsai commented on KAFKA-17046: It seems there is a PR already https://github.com/apache/kafka/pull/16469 > Upgrade netty version to 4.1.111.Final > -- > > Key: KAFKA-17046 > URL: https://issues.apache.org/jira/browse/KAFKA-17046 > Project: Kafka > Issue Type: Improvement >Reporter: XiDuo You >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17046) Upgrade netty version to 4.1.111.Final
[ https://issues.apache.org/jira/browse/KAFKA-17046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-17046: -- Assignee: XiDuo You > Upgrade netty version to 4.1.111.Final > -- > > Key: KAFKA-17046 > URL: https://issues.apache.org/jira/browse/KAFKA-17046 > Project: Kafka > Issue Type: Improvement >Reporter: XiDuo You >Assignee: XiDuo You >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17047) Refactor Consumer group and shared classes with Share to modern package
Apoorv Mittal created KAFKA-17047: - Summary: Refactor Consumer group and shared classes with Share to modern package Key: KAFKA-17047 URL: https://issues.apache.org/jira/browse/KAFKA-17047 Project: Kafka Issue Type: Sub-task Reporter: Apoorv Mittal Assignee: Apoorv Mittal -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-17028) FindCoordinator v6 initial implementation
[ https://issues.apache.org/jira/browse/KAFKA-17028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrew Schofield resolved KAFKA-17028. -- Fix Version/s: 3.9.0 Resolution: Fixed > FindCoordinator v6 initial implementation > - > > Key: KAFKA-17028 > URL: https://issues.apache.org/jira/browse/KAFKA-17028 > Project: Kafka > Issue Type: Sub-task >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Fix For: 3.9.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860408#comment-17860408 ] yazgoo edited comment on KAFKA-10413 at 6/27/24 1:38 PM: - Hello, I launch the attached script : [^rebalance.sh] And in my test after waiting for a few minutes, I get: one connect well balanced {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 15 "worker_id": "k2:8082" 15 "worker_id": "k3:8083" 15 "worker_id": "k4:8084" 15 "worker_id": "k5:8085" 15 "worker_id": "k6:8086" 15 "worker_id": "k7:8087" 15 "worker_id": "k8:8088" 15 "worker_id": "k9:8089" {code} And the other one unbalanced {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 27 "worker_id": "k1:8081" 11 "worker_id": "k2:8082" 12 "worker_id": "k3:8083" 11 "worker_id": "k4:8084" 12 "worker_id": "k5:8085" 12 "worker_id": "k6:8086" 11 "worker_id": "k7:8087" 12 "worker_id": "k8:8088" 12 "worker_id": "k9:8089" {code} Regards was (Author: yazgoo): Hello, I launch the attached script : [^rebalance.sh] And in one of my test I get onne connect well balanced {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 15 "worker_id": "k2:8082" 15 "worker_id": "k3:8083" 15 "worker_id": "k4:8084" 15 "worker_id": "k5:8085" 15 "worker_id": "k6:8086" 15 "worker_id": "k7:8087" 15 "worker_id": "k8:8088" 15 "worker_id": "k9:8089" {code} And the other one unbalanced {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 27 "worker_id": "k1:8081" 11 "worker_id": "k2:8082" 12 "worker_id": "k3:8083" 11 "worker_id": "k4:8084" 12 "worker_id": "k5:8085" 12 "worker_id": "k6:8086" 11 "worker_id": "k7:8087" 12 "worker_id": "k8:8088" 12 "worker_id": "k9:8089" {code} Regards > rebalancing leads to unevenly balanced connectors > - > > Key: KAFKA-10413 > URL: https://issues.apache.org/jira/browse/KAFKA-10413 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 2.5.1 >Reporter: yazgoo >Assignee: rameshkrishnan muthusamy >Priority: Major > Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2 > > Attachments: connect_worker_balanced.png, rebalance.sh > > > GHi, > With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, > if a connect instance disappear, or a new one appear, we're seeing unbalanced > consumption, much like mentionned in this post: > [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors] > This usually leads to one kafka connect instance taking most of the load and > consumption not being able to keep on. > Currently, we're "fixing" this by deleting the connector and re-creating it, > but this is far from ideal. > Any suggestion on what we could do to mitigate this ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16791) Add thread detection to RaftClusterInvocationContext/ZkClusterInvocationContext
[ https://issues.apache.org/jira/browse/KAFKA-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860492#comment-17860492 ] PoAn Yang commented on KAFKA-16791: --- Hi [~bboyleonp], if you're not working on this issue, may I try it? Thank you. > Add thread detection to > RaftClusterInvocationContext/ZkClusterInvocationContext > --- > > Key: KAFKA-16791 > URL: https://issues.apache.org/jira/browse/KAFKA-16791 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: bboyleonp >Priority: Minor > > -`ClusterTestExtensions` should implement `BeforeAllCallback` and > `AfterAllCallback` by `TestUtils.verifyNoUnexpectedThreads`- > We can leverage `BeforeEachCallback`/`AfterEachCallback` to implement new > AfterEachCallback instead of `verifyNoUnexpectedThreads`. Notice the new > thread detection should avoid cascading failure - the thread leak should make > specific test case (rather than all subsequent test cases) fail -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16791) Add thread detection to RaftClusterInvocationContext/ZkClusterInvocationContext
[ https://issues.apache.org/jira/browse/KAFKA-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-16791: -- Assignee: PoAn Yang (was: bboyleonp) > Add thread detection to > RaftClusterInvocationContext/ZkClusterInvocationContext > --- > > Key: KAFKA-16791 > URL: https://issues.apache.org/jira/browse/KAFKA-16791 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: PoAn Yang >Priority: Minor > > -`ClusterTestExtensions` should implement `BeforeAllCallback` and > `AfterAllCallback` by `TestUtils.verifyNoUnexpectedThreads`- > We can leverage `BeforeEachCallback`/`AfterEachCallback` to implement new > AfterEachCallback instead of `verifyNoUnexpectedThreads`. Notice the new > thread detection should avoid cascading failure - the thread leak should make > specific test case (rather than all subsequent test cases) fail -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16364) MM2 High-Resolution Offset Translation
[ https://issues.apache.org/jira/browse/KAFKA-16364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860507#comment-17860507 ] M Mehrtens commented on KAFKA-16364: This is especially relevant for cluster migration scenarios when consumer group offsets should be replicated with a minimum latency. Ideally, consumer groups could detach from the source cluster and attach to the target cluster without needing to process a batch of duplicate messages. This would require offset sync/translation for inactive consumer groups - which seems like a pretty reasonable addition, especially if the consumer groups were previously synced. > MM2 High-Resolution Offset Translation > -- > > Key: KAFKA-16364 > URL: https://issues.apache.org/jira/browse/KAFKA-16364 > Project: Kafka > Issue Type: New Feature > Components: mirrormaker >Reporter: Greg Harris >Priority: Minor > Labels: needs-kip > > The current OffsetSyncStore implementation > [https://github.com/apache/kafka/blob/8b72a2c72f09838fdd2e7416c98d30fe876b4078/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L57] > stores a sparse index of offset syncs. This attempts to strike a balanced > default behavior between offset translation availability, memory usage, and > throughput on the offset syncs topic. > However, this balanced default behavior is not good enough in all > circumstances. When precise offset translation is needed away from the end of > the topic, such as for consumer groups with persistent lag, offset > translation can be more precise. Users should have a way to configure > high-precision offset translation, either through additional memory usage or > re-reading the offset syncs topic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514 ] yazgoo commented on KAFKA-10413: Here is yet another simpler version of the script, with less workers and which does not try and restart any worker: {code:java} #!/bin/bash set -xedkill() { docker stop "$1" || true docker rm -v -f "$1" || true }write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket }launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" }cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done }launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done }cleanup_docker_envlaunch_kafka launch_minio launch_kafka_connect 1while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break donesleep 10for i in {1..2} do# Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectorsdonelaunch_kafka_connect 2 launch_kafka_connect 3 launch_kafka_connect 4 launch_kafka_connect 5 {code} When the scrit ends, I have two each connector one worker with connector #1 tasks, the other one with connector #2 tasks. Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 30 "worker_id": "k2:8082" 30 "worker_id": "k3:8083" 30 "worker_id": "k4:8084" 30 "worker_id": "k5:8085" {code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 48 "worker_id": "k1:8081" 18 "worker_id": "k2:8082" 18 "worker_id": "k3:8083" 18 "worker_id": "k4:8084" 18 "worker_id": "k5:8085" {code} > rebalancing leads
[jira] [Commented] (KAFKA-7339) Migrate from JUnit 4 to JUnit 5
[ https://issues.apache.org/jira/browse/KAFKA-7339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860515#comment-17860515 ] Ismael Juma commented on KAFKA-7339: Yay! > Migrate from JUnit 4 to JUnit 5 > --- > > Key: KAFKA-7339 > URL: https://issues.apache.org/jira/browse/KAFKA-7339 > Project: Kafka > Issue Type: Improvement >Reporter: Ismael Juma >Assignee: Chia-Ping Tsai >Priority: Major > Fix For: 3.9.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514 ] yazgoo edited comment on KAFKA-10413 at 6/27/24 2:39 PM: - Here is yet another simpler version of the script, with less workers and which does not try and restart any worker: {code:java} #!/bin/bash set -xedkill() { docker stop "$1" || true docker rm -v -f "$1" || true }write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_envlaunch_kafka launch_minio launch_kafka_connect 1while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break donesleep 10for i in {1..2} do# Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectorsdonelaunch_kafka_connect 2 launch_kafka_connect 3 launch_kafka_connect 4 launch_kafka_connect 5 {code} When the scrit ends, I have two each connector one worker with connector #1 tasks, the other one with connector #2 tasks. Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 30 "worker_id": "k2:8082" 30 "worker_id": "k3:8083" 30 "worker_id": "k4:8084" 30 "worker_id": "k5:8085" {code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 48 "worker_id": "k1:8081" 18 "worker_id": "k2:8082" 18 "worker_id": "k3:8083" 18 "worker_id": "k4:8084" 18 "w
[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514 ] yazgoo edited comment on KAFKA-10413 at 6/27/24 2:40 PM: - Here is yet another simpler version of the script, with less workers and which does not try and restart any worker: {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 launch_kafka_connect 4 launch_kafka_connect 5 {code} When the scrit ends, I have two each connector one worker with connector #1 tasks, the other one with connector #2 tasks. Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 30 "worker_id": "k2:8082" 30 "worker_id": "k3:8083" 30 "worker_id": "k4:8084" 30 "worker_id": "k5:8085" {code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 48 "worker_id": "k1:8081" 18 "worker_id": "k2:8082" 18 "worker_id": "k3:8083" 18 "worker_id": "k
[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514 ] yazgoo edited comment on KAFKA-10413 at 6/27/24 2:47 PM: - Here is yet another simpler version of the script, with less workers and which does not try and restart any worker: {code:java} {code} {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the scrit ends, I have two each connector one worker with connector #1 tasks, the other one with connector #2 tasks. Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 60 "worker_id": "k2:8082" 60 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 80 "worker_id": "k1:8081" 20 "worker_id": "k2:8082" 20 "worker_id": "k3:8083" {code} In the end, we indeed get 80 tasks on each workers, but for distribution reasons , I think it should be (40, 40, 4
[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514 ] yazgoo edited comment on KAFKA-10413 at 6/27/24 2:47 PM: - Here is yet another simpler version of the script, with less workers and which does not try and restart any worker: {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the scrit ends, I have two each connector one worker with connector #1 tasks, the other one with connector #2 tasks. Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 60 "worker_id": "k2:8082" 60 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 80 "worker_id": "k1:8081" 20 "worker_id": "k2:8082" 20 "worker_id": "k3:8083" {code} In the end, we indeed get 80 tasks on each workers, but for distribution reasons , I think it should be (40, 40, 40) for each connector
[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514 ] yazgoo edited comment on KAFKA-10413 at 6/27/24 2:52 PM: - Here is yet another simpler version of the script, with less workers and which does not try and restart any worker: {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the scrit ends, I have two each connector one worker with connector #1 tasks, the other one with connector #2 tasks. {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081" {code} Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 60 "worker_id": "k2:8082" 60 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081
[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514 ] yazgoo edited comment on KAFKA-10413 at 6/27/24 2:54 PM: - Here is yet another simpler version of the script, with less workers and which does not try and restart any worker: {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the script ends, I have one worker with connector #1 tasks, the other one with connector #2 tasks. {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081" {code} Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 60 "worker_id": "k2:8082" 60 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-conne
[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514 ] yazgoo edited comment on KAFKA-10413 at 6/27/24 2:54 PM: - Here is yet another simpler version of the script, with less workers and which does not try and restart any worker: {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the script ends, I have one worker with connector #1 tasks, the other one with connector #2 tasks. {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081" {code} Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 60 "worker_id": "k2:8082" 60 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-conne
[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514 ] yazgoo edited comment on KAFKA-10413 at 6/27/24 3:08 PM: - Here is yet another simpler version of the script, with less workers and which does not try and restart any worker: {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the script ends, I have one worker with connector #1 tasks, the other one with connector #2 tasks. {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081" {code} Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 60 "worker_id": "k2:8082" 60 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-conne
[jira] [Created] (KAFKA-17048) Document how to use KIP-853
José Armando García Sancio created KAFKA-17048: -- Summary: Document how to use KIP-853 Key: KAFKA-17048 URL: https://issues.apache.org/jira/browse/KAFKA-17048 Project: Kafka Issue Type: Sub-task Reporter: José Armando García Sancio This should include # Changes to the quick start # Operational changes in the Apache Kafka documentation # Recommended configuration for KIP-853 # Commands and runbook for creating a cluster # Commands and metrics for monitoring a kraft cluster with KIP-853 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16781) Expose advertised.listeners in controller node
[ https://issues.apache.org/jira/browse/KAFKA-16781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860554#comment-17860554 ] Gantigmaa Selenge commented on KAFKA-16781: --- [~showuon] I think we can close this issue as it's already implemented in [https://github.com/apache/kafka/pull/16235] > Expose advertised.listeners in controller node > -- > > Key: KAFKA-16781 > URL: https://issues.apache.org/jira/browse/KAFKA-16781 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Gantigmaa Selenge >Priority: Major > Labels: need-kip, newbie, newbie++ > > After > [KIP-919|https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration], > we allow clients to talk to the KRaft controller node directly. But unlike > broker node, we don't allow users to config advertised.listeners for clients > to connect to. Without this config, the client cannot connect to the > controller node if the controller is sitting behind NAT network while the > client is in the external network. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17049) unbalanced connectors
yazgoo created KAFKA-17049: -- Summary: unbalanced connectors Key: KAFKA-17049 URL: https://issues.apache.org/jira/browse/KAFKA-17049 Project: Kafka Issue Type: Bug Reporter: yazgoo This follows https://issues.apache.org/jira/browse/KAFKA-10413 When runnning the following script (create two connectors on an existing worker, then add two workers) : {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the script ends, I have one worker with connector #1 tasks, the other one with connector #2 tasks. {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081" {code} Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 60 "worker_id": "k2:8082" 60 "worker_id": "k3:8083"{code} {code:jav
[jira] [Commented] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860562#comment-17860562 ] yazgoo commented on KAFKA-10413: I created https://issues.apache.org/jira/browse/KAFKA-17049 > rebalancing leads to unevenly balanced connectors > - > > Key: KAFKA-10413 > URL: https://issues.apache.org/jira/browse/KAFKA-10413 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 2.5.1 >Reporter: yazgoo >Assignee: rameshkrishnan muthusamy >Priority: Major > Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2 > > Attachments: connect_worker_balanced.png, rebalance.sh > > > GHi, > With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, > if a connect instance disappear, or a new one appear, we're seeing unbalanced > consumption, much like mentionned in this post: > [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors] > This usually leads to one kafka connect instance taking most of the load and > consumption not being able to keep on. > Currently, we're "fixing" this by deleting the connector and re-creating it, > but this is far from ideal. > Any suggestion on what we could do to mitigate this ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17049) unbalanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yazgoo updated KAFKA-17049: --- Component/s: connect > unbalanced connectors > - > > Key: KAFKA-17049 > URL: https://issues.apache.org/jira/browse/KAFKA-17049 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: yazgoo >Priority: Major > > This follows https://issues.apache.org/jira/browse/KAFKA-10413 > When runnning the following script (create two connectors on an existing > worker, then add two workers) : > {code:java} > #!/bin/bash > set -xe > dkill() { > docker stop "$1" || true > docker rm -v -f "$1" || true > } > write_topic() { > # write 200 messages to the topic > json='{"name": "test"}' > docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | > /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 > --topic test_topic$1" > } > launch_minio() { > # Launch Minio (Fake S3) > docker run --network host -d --name minio \ > -e MINIO_ROOT_USER=minioadmin \ > -e MINIO_ROOT_PASSWORD=minioadmin \ > minio/minio server --console-address :9001 /data > docker exec -it minio mkdir /data/my-minio-bucket > } > launch_kafka_connect() { > # Start Kafka Connect with S3 Connector > docker run --network host -d --name "kafka-connect$1" \ > -e AWS_ACCESS_KEY_ID=minioadmin \ > -e AWS_SECRET_ACCESS_KEY=minioadmin \ > -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ > -e CONNECT_LISTENERS="http://localhost:808$1"; \ > -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ > -e CONNECT_REST_PORT="808$1" \ > -e CONNECT_GROUP_ID="connect-cluster" \ > -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ > -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ > -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ > -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ > -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" > \ > -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ > -e > CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ > -e > CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" > \ > -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ > -e > CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ > --entrypoint bash \ > confluentinc/cp-kafka-connect:7.6.1 \ > -c "confluent-hub install --no-prompt > confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" > } > cleanup_docker_env() { > docker volume prune -f > for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka > minio > do > dkill "$container" > done > } > launch_kafka() { > docker run --network host --hostname localhost --ulimit nofile=65536:65536 > -d --name kafka -p 9092:9092 apache/kafka > for i in {1..2} > do > # Create a Kafka topic > docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create > --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 > --topic "test_topic$i" > write_topic "$i" > done > for topic in connect-configs connect-offsets connect-status > do > # with cleanup.policy=compact, we can't have more than 1 partition > docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create > --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic > $topic --config cleanup.policy=compact > done > } > cleanup_docker_env > launch_kafka > launch_minio > launch_kafka_connect 1 > while true > do > sleep 5 > # Check if Kafka Connect is up > curl http://localhost:8081/ || continue > break > done > sleep 10 > for i in {1..2} > do > # Set up a connector > curl -X POST -H "Content-Type: application/json" --data '{ > "name": "s3-connector'"$i"'", > "config": { > "connector.class": "io.confluent.connect.s3.S3SinkConnector", > "tasks.max": "120", > "topics": "test_topic'"$i"'", > "s3.region": "us-east-1", > "store.url": "http://0.0.0.0:9000";, > "s3.bucket.name": "my-minio-bucket", > "s3.part.size": "5242880", > "flush.size": "3", > "storage.class": "io.confluent.connect.s3.storage.S3Storage", > "format.class": "io.confluent.connect.s3.format.json.JsonFormat", > "schema.generator.class": > "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", > "schema.compatibility": "NONE" > } > }' http://localhost:8081/connectors > done > launch_kafka_connect 2 > launch_kafka_connect 3 > {code} > > > When the script ends, I have one worker with connector #1 tasks, the other > one with connector #2 tasks. > {code:java} > ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks > |grep worker_id | sort | uniq
[jira] [Updated] (KAFKA-17049) unbalanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yazgoo updated KAFKA-17049: --- Description: This follows https://issues.apache.org/jira/browse/KAFKA-10413 When runnning the following script, which 1. runs one worker 2. declares two connectors 3. add two more workers {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the script ends, I have one worker with connector #1 tasks, the other one with connector #2 tasks. {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081" {code} Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 60 "worker_id": "k2:8082" 60 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |gre
[jira] [Updated] (KAFKA-17049) unbalanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yazgoo updated KAFKA-17049: --- Description: This follows https://issues.apache.org/jira/browse/KAFKA-10413 When runnning the following script, which 1. runs one worker 2. declares two connectors 3. add two more workers {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the script ends, I have one worker with connector #1 tasks, the other one with connector #2 tasks. {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081" {code} Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 60 "worker_id": "k2:8082" 60 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep
[jira] [Updated] (KAFKA-17049) unbalanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yazgoo updated KAFKA-17049: --- Description: This follows https://issues.apache.org/jira/browse/KAFKA-10413 When runnning the following script, which 1. runs one worker 2. declares two connectors 3. adds two more workers {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the script ends, I have one worker with connector #1 tasks, the other one with connector #2 tasks. {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081" {code} Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 60 "worker_id": "k2:8082" 60 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep
[jira] [Updated] (KAFKA-17049) unbalanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yazgoo updated KAFKA-17049: --- Description: This follows https://issues.apache.org/jira/browse/KAFKA-10413 When runnning the following script, which 1. runs one worker 2. declares two connectors 3. adds two more workers {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the script ends, I have one worker with connector #1 tasks, the other one with connector #2 tasks. {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081" {code} Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 60 "worker_id": "k2:8082" 60 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep
[jira] [Updated] (KAFKA-17049) unbalanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yazgoo updated KAFKA-17049: --- Description: This follows https://issues.apache.org/jira/browse/KAFKA-10413 When runnning the following script, which 1. runs one worker 2. declares two connectors 3. adds two more workers {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the script ends, I have one worker with connector #1 tasks, the other one with connector #2 tasks. {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081" {code} Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 60 "worker_id": "k2:8082" 60 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep
[jira] [Updated] (KAFKA-17049) unbalanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yazgoo updated KAFKA-17049: --- Description: This follows https://issues.apache.org/jira/browse/KAFKA-10413 When runnning the following script, which 1. runs one worker 2. declares two connectors 3. adds two more workers {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } write_topic() { # write 200 messages to the topic json='{"name": "test"}' docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 --topic test_topic$1" } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic "test_topic$i" write_topic "$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "120", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the script ends, I have one worker with connector #1 tasks, the other one with connector #2 tasks. {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 120 "worker_id": "k1:8081" {code} Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 60 "worker_id": "k2:8082" 60 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep
[jira] [Updated] (KAFKA-17049) unbalanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yazgoo updated KAFKA-17049: --- Description: This follows https://issues.apache.org/jira/browse/KAFKA-10413 When runnning the following script, which 1. runs one worker 2. declares two connectors 3. adds two more workers {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 --topic "test_topic$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "12", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the script ends, I have one worker with connector #1 tasks, the other one with connector #2 tasks. {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 12 "worker_id": "k1:8081"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 12 "worker_id": "k1:8081" {code} Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 6 "worker_id": "k2:8082" 6 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 8 "worker_id": "k1:8081" 2 "worker_id": "k2:8082" 2 "worker_id": "k3:8083" {code} In the end, we indeed get 8 tasks on each workers, but for distribution reasons , I think it should be (4, 4, 4) for each connector, bec
[jira] [Updated] (KAFKA-17049) unbalanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yazgoo updated KAFKA-17049: --- Description: This follows https://issues.apache.org/jira/browse/KAFKA-10413 When runnning the following script, which 1. runs one worker 2. declares two connectors 3. adds two more workers {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 --topic "test_topic$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "12", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the script ends, I have one worker with connector #1 tasks, the other one with connector #2 tasks. {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 12 "worker_id": "k1:8081"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 12 "worker_id": "k1:8081" {code} Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 6 "worker_id": "k2:8082" 6 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 8 "worker_id": "k1:8081" 2 "worker_id": "k2:8082" 2 "worker_id": "k3:8083" {code} In the end, we indeed get 8 tasks on each workers, but for distribution reasons , I think it should be (4, 4, 4) for each connector, bec
[jira] [Updated] (KAFKA-17049) unbalanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yazgoo updated KAFKA-17049: --- Description: This follows https://issues.apache.org/jira/browse/KAFKA-10413 When runnning the following script, which 1. runs one worker 2. declares two connectors 3. adds two more workers {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 --topic "test_topic$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "12", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the script ends, I have one worker taking all the connectors/tasks: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 12 "worker_id": "k1:8081"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 12 "worker_id": "k1:8081" {code} Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 6 "worker_id": "k2:8082" 6 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 8 "worker_id": "k1:8081" 2 "worker_id": "k2:8082" 2 "worker_id": "k3:8083" {code} In the end, we indeed get 8 tasks on each workers, but for distribution reasons , I think it should be (4, 4, 4) for each connector, because all connectors don't do the
[jira] [Updated] (KAFKA-17049) unbalanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yazgoo updated KAFKA-17049: --- Description: This follows https://issues.apache.org/jira/browse/KAFKA-10413 When runnning the following script, which 1. runs one worker 2. declares two connectors 3. adds two more workers {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 --topic "test_topic$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "12", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the script ends, I have the first worker taking all the connectors/tasks: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 12 "worker_id": "k1:8081"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 12 "worker_id": "k1:8081" {code} Then I wait a few minutes, And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 6 "worker_id": "k2:8082" 6 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 8 "worker_id": "k1:8081" 2 "worker_id": "k2:8082" 2 "worker_id": "k3:8083" {code} In the end, we indeed get 8 tasks on each workers, but for distribution reasons , I think it should be (4, 4, 4) for each connector, because all connectors do
[jira] [Updated] (KAFKA-17049) unbalanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yazgoo updated KAFKA-17049: --- Description: This follows https://issues.apache.org/jira/browse/KAFKA-10413 When runnning the following script, which 1. runs one worker 2. declares two connectors 3. adds two more workers {code:java} #!/bin/bash set -xe dkill() { docker stop "$1" || true docker rm -v -f "$1" || true } launch_minio() { # Launch Minio (Fake S3) docker run --network host -d --name minio \ -e MINIO_ROOT_USER=minioadmin \ -e MINIO_ROOT_PASSWORD=minioadmin \ minio/minio server --console-address :9001 /data docker exec -it minio mkdir /data/my-minio-bucket } launch_kafka_connect() { # Start Kafka Connect with S3 Connector docker run --network host -d --name "kafka-connect$1" \ -e AWS_ACCESS_KEY_ID=minioadmin \ -e AWS_SECRET_ACCESS_KEY=minioadmin \ -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ -e CONNECT_LISTENERS="http://localhost:808$1"; \ -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ -e CONNECT_REST_PORT="808$1" \ -e CONNECT_GROUP_ID="connect-cluster" \ -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ -e CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ --entrypoint bash \ confluentinc/cp-kafka-connect:7.6.1 \ -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" } cleanup_docker_env() { docker volume prune -f for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka minio do dkill "$container" done } launch_kafka() { docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d --name kafka -p 9092:9092 apache/kafka for i in {1..2} do # Create a Kafka topic docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 --topic "test_topic$i" done for topic in connect-configs connect-offsets connect-status do # with cleanup.policy=compact, we can't have more than 1 partition docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic $topic --config cleanup.policy=compact done } cleanup_docker_env launch_kafka launch_minio launch_kafka_connect 1 while true do sleep 5 # Check if Kafka Connect is up curl http://localhost:8081/ || continue break done sleep 10 for i in {1..2} do # Set up a connector curl -X POST -H "Content-Type: application/json" --data '{ "name": "s3-connector'"$i"'", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "tasks.max": "12", "topics": "test_topic'"$i"'", "s3.region": "us-east-1", "store.url": "http://0.0.0.0:9000";, "s3.bucket.name": "my-minio-bucket", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "io.confluent.connect.s3.format.json.JsonFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "schema.compatibility": "NONE" } }' http://localhost:8081/connectors done launch_kafka_connect 2 launch_kafka_connect 3 {code} When the script ends, I have the first worker taking all the connectors/tasks: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 12 "worker_id": "k1:8081"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 12 "worker_id": "k1:8081" {code} Then I wait 3 minutes And I get the final state: {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks |grep worker_id | sort | uniq -c 6 "worker_id": "k2:8082" 6 "worker_id": "k3:8083"{code} {code:java} ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks |grep worker_id | sort | uniq -c 8 "worker_id": "k1:8081" 2 "worker_id": "k2:8082" 2 "worker_id": "k3:8083" {code} In the end, we indeed get 8 tasks on each workers, but for distribution reasons , I think it should be (4, 4, 4) for each connector, because all connectors don't d
[jira] [Updated] (KAFKA-16228) Add remote log metadata flag to the dump log tool
[ https://issues.apache.org/jira/browse/KAFKA-16228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri updated KAFKA-16228: Description: It would be good to improve the kafka-dump-log.sh tool adding a decode flag for __remote_log_metadata records. {code} bin/kafka-dump-log.sh --remote-log-metadata-decoder --files /opt/kafka/data/__remote_log_metadata-0/.log {code} was: It would be good to improve the kafka-dump-log.sh tool adding a decode flag for __remote_log_metadata records. Something like the following would be useful for debugging. {code} bin/kafka-dump-log.sh --remote-log-metadata-decoder --files /opt/kafka/data/__remote_log_metadata-0/.log {code} > Add remote log metadata flag to the dump log tool > - > > Key: KAFKA-16228 > URL: https://issues.apache.org/jira/browse/KAFKA-16228 > Project: Kafka > Issue Type: New Feature > Components: Tiered-Storage >Affects Versions: 3.6.1 >Reporter: Federico Valeri >Assignee: Federico Valeri >Priority: Major > Labels: kip-required > Fix For: 3.9.0 > > > It would be good to improve the kafka-dump-log.sh tool adding a decode flag > for __remote_log_metadata records. > {code} > bin/kafka-dump-log.sh --remote-log-metadata-decoder --files > /opt/kafka/data/__remote_log_metadata-0/.log > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17049) unbalanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860586#comment-17860586 ] Greg Harris commented on KAFKA-17049: - Thank you for providing a detailed reproduction case. I wrote this unit test in IncrementalCooperativeAssignorTest that displays the same behavior: {noformat} @Test public void checkIndividualConnectorBalance() { connectors.clear(); addNewConnector("connector1", 12); performStandardRebalance(); addNewConnector("connector2", 12); performStandardRebalance(); addNewEmptyWorkers("worker2"); performStandardRebalance(); performStandardRebalance(); addNewEmptyWorkers("worker3"); performStandardRebalance(); performStandardRebalance(); }{noformat} The resulting assignment is: {noformat} "worker1" -> {WorkerCoordinator$ConnectorsAndTasks@4287} "{ connectorIds=[connector2], taskIds=[connector2-4, connector2-5, connector2-6, connector2-7, connector2-8, connector2-9, connector2-10, connector2-11]}" "worker2" -> {WorkerCoordinator$ConnectorsAndTasks@4289} "{ connectorIds=[connector1], taskIds=[connector1-4, connector1-5, connector1-6, connector1-7, connector1-8, connector1-9, connector1-10, connector1-11]}" "worker3" -> {WorkerCoordinator$ConnectorsAndTasks@4291} "{ connectorIds=[], taskIds=[connector1-0, connector1-1, connector1-2, connector1-3, connector2-0, connector2-1, connector2-2, connector2-3]}"{noformat} I think the root cause is that the order in which load-balancing revocations take place is defined by the iteration order of this set, rather than some more intentional strategy. [https://github.com/apache/kafka/blob/3ebad6349de7d121a31f9d47c5ede7d6bbfac4d1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L739-L754] [~yazgoo] Are you interested in working on this? I think this will require some changes to which revocations the assignor generates. > unbalanced connectors > - > > Key: KAFKA-17049 > URL: https://issues.apache.org/jira/browse/KAFKA-17049 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: yazgoo >Priority: Major > > This follows https://issues.apache.org/jira/browse/KAFKA-10413 > When runnning the following script, which > 1. runs one worker > 2. declares two connectors > 3. adds two more workers > > {code:java} > #!/bin/bash > set -xe > dkill() { > docker stop "$1" || true > docker rm -v -f "$1" || true > } > launch_minio() { > # Launch Minio (Fake S3) > docker run --network host -d --name minio \ > -e MINIO_ROOT_USER=minioadmin \ > -e MINIO_ROOT_PASSWORD=minioadmin \ > minio/minio server --console-address :9001 /data > docker exec -it minio mkdir /data/my-minio-bucket > } > launch_kafka_connect() { > # Start Kafka Connect with S3 Connector > docker run --network host -d --name "kafka-connect$1" \ > -e AWS_ACCESS_KEY_ID=minioadmin \ > -e AWS_SECRET_ACCESS_KEY=minioadmin \ > -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ > -e CONNECT_LISTENERS="http://localhost:808$1"; \ > -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ > -e CONNECT_REST_PORT="808$1" \ > -e CONNECT_GROUP_ID="connect-cluster" \ > -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ > -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ > -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ > -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ > -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" > \ > -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ > -e > CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ > -e > CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" > \ > -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ > -e > CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ > --entrypoint bash \ > confluentinc/cp-kafka-connect:7.6.1 \ > -c "confluent-hub install --no-prompt > confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" > } > cleanup_docker_env() { > docker volume prune -f > for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka > minio > do > dkill "$container" > done > } > launch_kafka() { > docker run --network host --hostname localhost --ulimit nofile=65536:65536 > -d --name kafka -p 9092:9092 apache/kafka > for i in {1..2} > do > # Create a Kafka topic > docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create > --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 > --topic "test_topic$i" > done > for topic in connect-configs connect-offsets connect-status > do > # with cleanup.polic
[jira] [Assigned] (KAFKA-17049) unbalanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-17049: --- Assignee: Greg Harris > unbalanced connectors > - > > Key: KAFKA-17049 > URL: https://issues.apache.org/jira/browse/KAFKA-17049 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: yazgoo >Assignee: Greg Harris >Priority: Major > > This follows https://issues.apache.org/jira/browse/KAFKA-10413 > When runnning the following script, which > 1. runs one worker > 2. declares two connectors > 3. adds two more workers > > {code:java} > #!/bin/bash > set -xe > dkill() { > docker stop "$1" || true > docker rm -v -f "$1" || true > } > launch_minio() { > # Launch Minio (Fake S3) > docker run --network host -d --name minio \ > -e MINIO_ROOT_USER=minioadmin \ > -e MINIO_ROOT_PASSWORD=minioadmin \ > minio/minio server --console-address :9001 /data > docker exec -it minio mkdir /data/my-minio-bucket > } > launch_kafka_connect() { > # Start Kafka Connect with S3 Connector > docker run --network host -d --name "kafka-connect$1" \ > -e AWS_ACCESS_KEY_ID=minioadmin \ > -e AWS_SECRET_ACCESS_KEY=minioadmin \ > -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ > -e CONNECT_LISTENERS="http://localhost:808$1"; \ > -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ > -e CONNECT_REST_PORT="808$1" \ > -e CONNECT_GROUP_ID="connect-cluster" \ > -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ > -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ > -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ > -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ > -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" > \ > -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ > -e > CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ > -e > CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" > \ > -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ > -e > CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ > --entrypoint bash \ > confluentinc/cp-kafka-connect:7.6.1 \ > -c "confluent-hub install --no-prompt > confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" > } > cleanup_docker_env() { > docker volume prune -f > for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka > minio > do > dkill "$container" > done > } > launch_kafka() { > docker run --network host --hostname localhost --ulimit nofile=65536:65536 > -d --name kafka -p 9092:9092 apache/kafka > for i in {1..2} > do > # Create a Kafka topic > docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create > --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 > --topic "test_topic$i" > done > for topic in connect-configs connect-offsets connect-status > do > # with cleanup.policy=compact, we can't have more than 1 partition > docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create > --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic > $topic --config cleanup.policy=compact > done > } > cleanup_docker_env > launch_kafka > launch_minio > launch_kafka_connect 1 > while true > do > sleep 5 > # Check if Kafka Connect is up > curl http://localhost:8081/ || continue > break > done > sleep 10 > for i in {1..2} > do > # Set up a connector > curl -X POST -H "Content-Type: application/json" --data '{ > "name": "s3-connector'"$i"'", > "config": { > "connector.class": "io.confluent.connect.s3.S3SinkConnector", > "tasks.max": "12", > "topics": "test_topic'"$i"'", > "s3.region": "us-east-1", > "store.url": "http://0.0.0.0:9000";, > "s3.bucket.name": "my-minio-bucket", > "s3.part.size": "5242880", > "flush.size": "3", > "storage.class": "io.confluent.connect.s3.storage.S3Storage", > "format.class": "io.confluent.connect.s3.format.json.JsonFormat", > "schema.generator.class": > "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", > "schema.compatibility": "NONE" > } > }' http://localhost:8081/connectors > done > launch_kafka_connect 2 > launch_kafka_connect 3 > {code} > > > When the script ends, I have the first worker taking all the connectors/tasks: > {code:java} > ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks > |grep worker_id | sort | uniq -c > 12 "worker_id": "k1:8081"{code} > {code:java} > ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks > |grep worker_id | sort | uniq -c > 12 "worker_id": "k1:8081" > {code} > > Then I wait a few minutes, >
[jira] [Assigned] (KAFKA-17049) unbalanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris reassigned KAFKA-17049: --- Assignee: (was: Greg Harris) > unbalanced connectors > - > > Key: KAFKA-17049 > URL: https://issues.apache.org/jira/browse/KAFKA-17049 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: yazgoo >Priority: Major > > This follows https://issues.apache.org/jira/browse/KAFKA-10413 > When runnning the following script, which > 1. runs one worker > 2. declares two connectors > 3. adds two more workers > > {code:java} > #!/bin/bash > set -xe > dkill() { > docker stop "$1" || true > docker rm -v -f "$1" || true > } > launch_minio() { > # Launch Minio (Fake S3) > docker run --network host -d --name minio \ > -e MINIO_ROOT_USER=minioadmin \ > -e MINIO_ROOT_PASSWORD=minioadmin \ > minio/minio server --console-address :9001 /data > docker exec -it minio mkdir /data/my-minio-bucket > } > launch_kafka_connect() { > # Start Kafka Connect with S3 Connector > docker run --network host -d --name "kafka-connect$1" \ > -e AWS_ACCESS_KEY_ID=minioadmin \ > -e AWS_SECRET_ACCESS_KEY=minioadmin \ > -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ > -e CONNECT_LISTENERS="http://localhost:808$1"; \ > -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ > -e CONNECT_REST_PORT="808$1" \ > -e CONNECT_GROUP_ID="connect-cluster" \ > -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ > -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ > -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ > -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ > -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" > \ > -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ > -e > CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ > -e > CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" > \ > -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ > -e > CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ > --entrypoint bash \ > confluentinc/cp-kafka-connect:7.6.1 \ > -c "confluent-hub install --no-prompt > confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" > } > cleanup_docker_env() { > docker volume prune -f > for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka > minio > do > dkill "$container" > done > } > launch_kafka() { > docker run --network host --hostname localhost --ulimit nofile=65536:65536 > -d --name kafka -p 9092:9092 apache/kafka > for i in {1..2} > do > # Create a Kafka topic > docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create > --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 > --topic "test_topic$i" > done > for topic in connect-configs connect-offsets connect-status > do > # with cleanup.policy=compact, we can't have more than 1 partition > docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create > --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic > $topic --config cleanup.policy=compact > done > } > cleanup_docker_env > launch_kafka > launch_minio > launch_kafka_connect 1 > while true > do > sleep 5 > # Check if Kafka Connect is up > curl http://localhost:8081/ || continue > break > done > sleep 10 > for i in {1..2} > do > # Set up a connector > curl -X POST -H "Content-Type: application/json" --data '{ > "name": "s3-connector'"$i"'", > "config": { > "connector.class": "io.confluent.connect.s3.S3SinkConnector", > "tasks.max": "12", > "topics": "test_topic'"$i"'", > "s3.region": "us-east-1", > "store.url": "http://0.0.0.0:9000";, > "s3.bucket.name": "my-minio-bucket", > "s3.part.size": "5242880", > "flush.size": "3", > "storage.class": "io.confluent.connect.s3.storage.S3Storage", > "format.class": "io.confluent.connect.s3.format.json.JsonFormat", > "schema.generator.class": > "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", > "schema.compatibility": "NONE" > } > }' http://localhost:8081/connectors > done > launch_kafka_connect 2 > launch_kafka_connect 3 > {code} > > > When the script ends, I have the first worker taking all the connectors/tasks: > {code:java} > ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks > |grep worker_id | sort | uniq -c > 12 "worker_id": "k1:8081"{code} > {code:java} > ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks > |grep worker_id | sort | uniq -c > 12 "worker_id": "k1:8081" > {code} > > Then I wait a few minutes, > And I get the final sta
[jira] [Assigned] (KAFKA-8812) Rebalance Producers
[ https://issues.apache.org/jira/browse/KAFKA-8812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna reassigned KAFKA-8812: Assignee: Bruno Cadonna > Rebalance Producers > --- > > Key: KAFKA-8812 > URL: https://issues.apache.org/jira/browse/KAFKA-8812 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.3.0 >Reporter: Werner Daehn >Assignee: Bruno Cadonna >Priority: Major > Labels: kip > > [KIP-509: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-509%3A+Rebalance+and+restart+Producers|https://cwiki.apache.org/confluence/display/KAFKA/KIP-509%3A+Rebalance+and+restart+Producers] > Please bare with me. Initially this thought sounds stupid but it has its > merits. > > How do you build a distributed producer at the moment? You use Kafka Connect > which in turn requires a cluster that tells which instance is producing what > partitions. > On the consumer side it is different. There Kafka itself does the data > distribution. If you have 10 Kafka partitions and 10 consumers, each will get > data for one partition. With 5 consumers, each will get data from two > partitions. And if there is only a single consumer active, it gets all data. > All is managed by Kafka, all you have to do is start as many consumers as you > want. > > I'd like to suggest something similar for the producers. A producer would > tell Kafka that its source has 10 partitions. The Kafka server then responds > with a list of partitions this instance shall be responsible for. If it is > the only producer, the response would be all 10 partitions. If it is the > second instance starting up, the first instance would get the information it > should produce data for partition 1-5 and the new one for partition 6-10. If > the producer fails to respond with an alive packet, a rebalance does happen, > informing the active producer to take more load and the dead producer will > get an error when sending data again. > For restart, the producer rebalance has to send the starting point where to > start producing the data onwards from as well, of course. Would be best if > this is a user generated pointer and not the topic offset. Then it can be > e.g. the database system change number, a database transaction id or > something similar. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-8812) Rebalance Producers
[ https://issues.apache.org/jira/browse/KAFKA-8812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna reassigned KAFKA-8812: Assignee: (was: Bruno Cadonna) > Rebalance Producers > --- > > Key: KAFKA-8812 > URL: https://issues.apache.org/jira/browse/KAFKA-8812 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.3.0 >Reporter: Werner Daehn >Priority: Major > Labels: kip > > [KIP-509: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-509%3A+Rebalance+and+restart+Producers|https://cwiki.apache.org/confluence/display/KAFKA/KIP-509%3A+Rebalance+and+restart+Producers] > Please bare with me. Initially this thought sounds stupid but it has its > merits. > > How do you build a distributed producer at the moment? You use Kafka Connect > which in turn requires a cluster that tells which instance is producing what > partitions. > On the consumer side it is different. There Kafka itself does the data > distribution. If you have 10 Kafka partitions and 10 consumers, each will get > data for one partition. With 5 consumers, each will get data from two > partitions. And if there is only a single consumer active, it gets all data. > All is managed by Kafka, all you have to do is start as many consumers as you > want. > > I'd like to suggest something similar for the producers. A producer would > tell Kafka that its source has 10 partitions. The Kafka server then responds > with a list of partitions this instance shall be responsible for. If it is > the only producer, the response would be all 10 partitions. If it is the > second instance starting up, the first instance would get the information it > should produce data for partition 1-5 and the new one for partition 6-10. If > the producer fails to respond with an alive packet, a rebalance does happen, > informing the active producer to take more load and the dead producer will > get an error when sending data again. > For restart, the producer rebalance has to send the starting point where to > start producing the data onwards from as well, of course. Would be best if > this is a user generated pointer and not the topic offset. Then it can be > e.g. the database system change number, a database transaction id or > something similar. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13574) NotLeaderOrFollowerException thrown for a successful send
[ https://issues.apache.org/jira/browse/KAFKA-13574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860637#comment-17860637 ] Laurenceau Julien commented on KAFKA-13574: --- We are 2 years later this bug report that assess exactly once processing is broken. I see some ideas, but I see no beginning of a solution. Is there any workaround or fix ? Do you guys think that this is not important ? Maybe some warning notice should be added on the documentation, because people choosing to pay the price of exactly-once generally care a lot about consistency ! > NotLeaderOrFollowerException thrown for a successful send > - > > Key: KAFKA-13574 > URL: https://issues.apache.org/jira/browse/KAFKA-13574 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 > Environment: openjdk version "11.0.13" 2021-10-19 >Reporter: Kyle Kingsbury >Priority: Minor > Labels: error-handling > > With org.apache.kafka/kafka-clients 3.0.0, under rare circumstances involving > multiple node and network failures, I've observed a call to `producer.send()` > throw `NotLeaderOrFollowerException` for a message which later appears in > `consumer.poll()` return values. > I don't have a reliable repro case for this yet, but the case I hit involved > retries=1000, acks=all, and idempotence enabled. I suspect what might be > happening here is that an initial attempt to send the message makes it to the > server and is committed, but the acknowledgement is lost e.g. due to timeout; > the Kafka producer then automatically retries the send attempt, and on that > retry hits a NotLeaderOrFollowerException, which is thrown back to the > caller. If we interpret NotLeaderOrFollowerException as a definite failure, > then this would constitute an aborted read. > I've seen issues like this in a number of databases around client or > server-internal retry mechanisms, and I think the thing to do is: rather than > throwing the most *recent* error, throw the {*}most indefinite{*}. That way > clients know that their request may have actually succeeded, and they won't > (e.g.) attempt to re-submit a non-idempotent request again. > As a side note: is there... perhaps documentation on which errors in Kafka > are supposed to be definite vs indefinite? NotLeaderOrFollowerException is a > subclass of RetriableException, but it looks like RetriableException is more > about transient vs permanent errors than whether it's safe to retry. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17049) unbalanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860647#comment-17860647 ] yazgoo commented on KAFKA-17049: Thanks [~gharris1727] , I'll try and have a look at it ! > unbalanced connectors > - > > Key: KAFKA-17049 > URL: https://issues.apache.org/jira/browse/KAFKA-17049 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: yazgoo >Priority: Major > > This follows https://issues.apache.org/jira/browse/KAFKA-10413 > When runnning the following script, which > 1. runs one worker > 2. declares two connectors > 3. adds two more workers > > {code:java} > #!/bin/bash > set -xe > dkill() { > docker stop "$1" || true > docker rm -v -f "$1" || true > } > launch_minio() { > # Launch Minio (Fake S3) > docker run --network host -d --name minio \ > -e MINIO_ROOT_USER=minioadmin \ > -e MINIO_ROOT_PASSWORD=minioadmin \ > minio/minio server --console-address :9001 /data > docker exec -it minio mkdir /data/my-minio-bucket > } > launch_kafka_connect() { > # Start Kafka Connect with S3 Connector > docker run --network host -d --name "kafka-connect$1" \ > -e AWS_ACCESS_KEY_ID=minioadmin \ > -e AWS_SECRET_ACCESS_KEY=minioadmin \ > -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ > -e CONNECT_LISTENERS="http://localhost:808$1"; \ > -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ > -e CONNECT_REST_PORT="808$1" \ > -e CONNECT_GROUP_ID="connect-cluster" \ > -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ > -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ > -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ > -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ > -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" > \ > -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ > -e > CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ > -e > CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" > \ > -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ > -e > CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ > --entrypoint bash \ > confluentinc/cp-kafka-connect:7.6.1 \ > -c "confluent-hub install --no-prompt > confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" > } > cleanup_docker_env() { > docker volume prune -f > for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka > minio > do > dkill "$container" > done > } > launch_kafka() { > docker run --network host --hostname localhost --ulimit nofile=65536:65536 > -d --name kafka -p 9092:9092 apache/kafka > for i in {1..2} > do > # Create a Kafka topic > docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create > --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 > --topic "test_topic$i" > done > for topic in connect-configs connect-offsets connect-status > do > # with cleanup.policy=compact, we can't have more than 1 partition > docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create > --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic > $topic --config cleanup.policy=compact > done > } > cleanup_docker_env > launch_kafka > launch_minio > launch_kafka_connect 1 > while true > do > sleep 5 > # Check if Kafka Connect is up > curl http://localhost:8081/ || continue > break > done > sleep 10 > for i in {1..2} > do > # Set up a connector > curl -X POST -H "Content-Type: application/json" --data '{ > "name": "s3-connector'"$i"'", > "config": { > "connector.class": "io.confluent.connect.s3.S3SinkConnector", > "tasks.max": "12", > "topics": "test_topic'"$i"'", > "s3.region": "us-east-1", > "store.url": "http://0.0.0.0:9000";, > "s3.bucket.name": "my-minio-bucket", > "s3.part.size": "5242880", > "flush.size": "3", > "storage.class": "io.confluent.connect.s3.storage.S3Storage", > "format.class": "io.confluent.connect.s3.format.json.JsonFormat", > "schema.generator.class": > "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", > "schema.compatibility": "NONE" > } > }' http://localhost:8081/connectors > done > launch_kafka_connect 2 > launch_kafka_connect 3 > {code} > > > When the script ends, I have the first worker taking all the connectors/tasks: > {code:java} > ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks > |grep worker_id | sort | uniq -c > 12 "worker_id": "k1:8081"{code} > {code:java} > ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks > |grep worker_id | sort | uniq -c > 12 "worker_id": "k1:8081" > {co
[jira] [Created] (KAFKA-17050) Revert group.version for 3.8 and 3.9
Justine Olshan created KAFKA-17050: -- Summary: Revert group.version for 3.8 and 3.9 Key: KAFKA-17050 URL: https://issues.apache.org/jira/browse/KAFKA-17050 Project: Kafka Issue Type: Task Affects Versions: 3.8.0, 3.9.0 Reporter: Justine Olshan Assignee: Justine Olshan After much discussion for KAFKA-17011, we decided it would be best for 3.8 to just remove the group version feature for 3.8. As for 3.9, [~dajac] said it would be easier for EA users of the group coordinator to have a single way to configure. For 4.0 we can reintroduce it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17011) SupportedFeatures.MinVersion incorrectly blocks v0
[ https://issues.apache.org/jira/browse/KAFKA-17011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan updated KAFKA-17011: --- Priority: Critical (was: Blocker) > SupportedFeatures.MinVersion incorrectly blocks v0 > -- > > Key: KAFKA-17011 > URL: https://issues.apache.org/jira/browse/KAFKA-17011 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.8.0 >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Critical > Fix For: 3.8.0 > > > SupportedFeatures.MinVersion incorrectly blocks v0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16986) After upgrading to Kafka 3.4.1, the producer constantly produces logs related to topicId changes
[ https://issues.apache.org/jira/browse/KAFKA-16986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860662#comment-17860662 ] Justine Olshan commented on KAFKA-16986: [~viniciusxyz] just curious – this is a ZK cluster I assume since the upgrade was from an earlier version? And I'm curious if we have metadata responses for these producers (request logging) I am also looking at a few more avenues on my end. It looks like somehow the topic ID is being removed from the producer's metadata cache so it looks like the topic ID in a metadata response is the first instance of the topic ID. We included this so in the upgrade from > 2.7 -> < 2.8 we would do the epoch reset correctly. It shouldn't trigger as often as your logs show though. Checking the client code to see if there was some assumption made about retaining this ID. > After upgrading to Kafka 3.4.1, the producer constantly produces logs related > to topicId changes > > > Key: KAFKA-16986 > URL: https://issues.apache.org/jira/browse/KAFKA-16986 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 3.0.1, 3.6.1 >Reporter: Vinicius Vieira dos Santos >Priority: Minor > Attachments: image.png > > > When updating the Kafka broker from version 2.7.0 to 3.4.1, we noticed that > the applications began to log the message "{*}Resetting the last seen epoch > of partition PAYMENTS-0 to 0 since the associated topicId changed from null > to szRLmiAiTs8Y0nI8b3Wz1Q{*}" in a very constant, from what I understand this > behavior is not expected because the topic was not deleted and recreated so > it should simply use the cached data and not go through this client log line. > We have some applications with around 15 topics and 40 partitions which means > around 600 log lines when metadata updates occur > The main thing for me is to know if this could indicate a problem or if I can > simply change the log level of the org.apache.kafka.clients.Metadata class to > warn without worries > > There are other reports of the same behavior like this: > [https://stackoverflow.com/questions/74652231/apache-kafka-resetting-the-last-seen-epoch-of-partition-why] > > *Some log occurrences over an interval of about 7 hours, each block refers to > an instance of the application in kubernetes* > > !image.png! > *My scenario:* > *Application:* > - Java: 21 > - Client: 3.6.1, also tested on 3.0.1 and has the same behavior > *Broker:* > - Cluster running on Kubernetes with the bitnami/kafka:3.4.1-debian-11-r52 > image > > *Producer Config* > > acks = -1 > auto.include.jmx.reporter = true > batch.size = 16384 > bootstrap.servers = [server:9092] > buffer.memory = 33554432 > client.dns.lookup = use_all_dns_ips > client.id = producer-1 > compression.type = gzip > connections.max.idle.ms = 54 > delivery.timeout.ms = 3 > enable.idempotence = true > interceptor.classes = [] > key.serializer = class > org.apache.kafka.common.serialization.ByteArraySerializer > linger.ms = 0 > max.block.ms = 6 > max.in.flight.requests.per.connection = 1 > max.request.size = 1048576 > metadata.max.age.ms = 30 > metadata.max.idle.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partitioner.adaptive.partitioning.enable = true > partitioner.availability.timeout.ms = 0 > partitioner.class = null > partitioner.ignore.keys = false > receive.buffer.bytes = 32768 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 3 > retries = 3 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = [hidden] > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.connect.timeout.ms = null > sasl.login.read.timeout.ms = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.login.retry.backoff.max.ms = 1 > sasl.login.retry.backoff.ms = 100 > sasl.mechanism = PLAIN > sasl.oauthbearer.clock.skew.seconds = 30 > sasl.oauthbearer.expected.audience = null > sasl.oauthbearer.expected.issuer = null > sasl.oauthbearer.jwks.endpoint.refresh.ms = 36000
[jira] [Updated] (KAFKA-17049) Incremental rebalances assign too many tasks for the same connector together
[ https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-17049: Summary: Incremental rebalances assign too many tasks for the same connector together (was: unbalanced connectors) > Incremental rebalances assign too many tasks for the same connector together > > > Key: KAFKA-17049 > URL: https://issues.apache.org/jira/browse/KAFKA-17049 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: yazgoo >Priority: Major > > This follows https://issues.apache.org/jira/browse/KAFKA-10413 > When runnning the following script, which > 1. runs one worker > 2. declares two connectors > 3. adds two more workers > > {code:java} > #!/bin/bash > set -xe > dkill() { > docker stop "$1" || true > docker rm -v -f "$1" || true > } > launch_minio() { > # Launch Minio (Fake S3) > docker run --network host -d --name minio \ > -e MINIO_ROOT_USER=minioadmin \ > -e MINIO_ROOT_PASSWORD=minioadmin \ > minio/minio server --console-address :9001 /data > docker exec -it minio mkdir /data/my-minio-bucket > } > launch_kafka_connect() { > # Start Kafka Connect with S3 Connector > docker run --network host -d --name "kafka-connect$1" \ > -e AWS_ACCESS_KEY_ID=minioadmin \ > -e AWS_SECRET_ACCESS_KEY=minioadmin \ > -e CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \ > -e CONNECT_LISTENERS="http://localhost:808$1"; \ > -e CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \ > -e CONNECT_REST_PORT="808$1" \ > -e CONNECT_GROUP_ID="connect-cluster" \ > -e CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \ > -e CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \ > -e CONNECT_STATUS_STORAGE_TOPIC="connect-status" \ > -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ > -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" > \ > -e CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ > -e > CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \ > -e > CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" > \ > -e CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \ > -e > CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \ > --entrypoint bash \ > confluentinc/cp-kafka-connect:7.6.1 \ > -c "confluent-hub install --no-prompt > confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run" > } > cleanup_docker_env() { > docker volume prune -f > for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka > minio > do > dkill "$container" > done > } > launch_kafka() { > docker run --network host --hostname localhost --ulimit nofile=65536:65536 > -d --name kafka -p 9092:9092 apache/kafka > for i in {1..2} > do > # Create a Kafka topic > docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create > --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 > --topic "test_topic$i" > done > for topic in connect-configs connect-offsets connect-status > do > # with cleanup.policy=compact, we can't have more than 1 partition > docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create > --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic > $topic --config cleanup.policy=compact > done > } > cleanup_docker_env > launch_kafka > launch_minio > launch_kafka_connect 1 > while true > do > sleep 5 > # Check if Kafka Connect is up > curl http://localhost:8081/ || continue > break > done > sleep 10 > for i in {1..2} > do > # Set up a connector > curl -X POST -H "Content-Type: application/json" --data '{ > "name": "s3-connector'"$i"'", > "config": { > "connector.class": "io.confluent.connect.s3.S3SinkConnector", > "tasks.max": "12", > "topics": "test_topic'"$i"'", > "s3.region": "us-east-1", > "store.url": "http://0.0.0.0:9000";, > "s3.bucket.name": "my-minio-bucket", > "s3.part.size": "5242880", > "flush.size": "3", > "storage.class": "io.confluent.connect.s3.storage.S3Storage", > "format.class": "io.confluent.connect.s3.format.json.JsonFormat", > "schema.generator.class": > "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", > "schema.compatibility": "NONE" > } > }' http://localhost:8081/connectors > done > launch_kafka_connect 2 > launch_kafka_connect 3 > {code} > > > When the script ends, I have the first worker taking all the connectors/tasks: > {code:java} > ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks > |grep worker_id | sort | uniq -c > 12 "worker_id": "k1:8081"{code} > {code:java} > ❯ curl -s http://localhost:8
[jira] [Resolved] (KAFKA-16781) Expose advertised.listeners in controller node
[ https://issues.apache.org/jira/browse/KAFKA-16781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-16781. --- Fix Version/s: 3.9.0 Resolution: Fixed > Expose advertised.listeners in controller node > -- > > Key: KAFKA-16781 > URL: https://issues.apache.org/jira/browse/KAFKA-16781 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Gantigmaa Selenge >Priority: Major > Labels: need-kip, newbie, newbie++ > Fix For: 3.9.0 > > > After > [KIP-919|https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration], > we allow clients to talk to the KRaft controller node directly. But unlike > broker node, we don't allow users to config advertised.listeners for clients > to connect to. Without this config, the client cannot connect to the > controller node if the controller is sitting behind NAT network while the > client is in the external network. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17025) KAFKA-17025: Producer throws uncaught exception in the io thread.
[ https://issues.apache.org/jira/browse/KAFKA-17025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hongshun Wang updated KAFKA-17025: -- Summary: KAFKA-17025: Producer throws uncaught exception in the io thread. (was: KafkaThread and KafkaProducer expose method to set setUncaughtExceptionHandler) > KAFKA-17025: Producer throws uncaught exception in the io thread. > - > > Key: KAFKA-17025 > URL: https://issues.apache.org/jira/browse/KAFKA-17025 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 3.6.2 >Reporter: Hongshun Wang >Assignee: Hongshun Wang >Priority: Major > > When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do > nothing: > > {code:java} > ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread > 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: > Direct buffer memory . > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) > at org.apache.kafka.clients.producer.internals.Sender.run > at java.Lang.Thread.run > {code} > > > I try to find what happens: > 1. It seems that OutOfMemoryError as a Error is not captured when > org.apache.kafka.clients.producer.internals.Sender#run try to catch a > Exception: > {code:java} > @Override > public void run() { > log.debug("Starting Kafka producer I/O thread."); > // main loop, runs until close is called > while (running) { > try { > runOnce(); > } catch (Exception e) { > log.error("Uncaught error in kafka producer I/O thread: ", e); > } > } > log.debug("Beginning shutdown of Kafka producer I/O thread, sending > remaining records."); > // okay we stopped accepting requests but there may still be > // requests in the transaction manager, accumulator or waiting for > acknowledgment, > // wait until these are completed. > while (!forceClose && ((this.accumulator.hasUndrained() || > this.client.inFlightRequestCount() > 0) || > hasPendingTransactionalRequests())) { > try { > runOnce(); > } catch (Exception e) { > log.error("Uncaught error in kafka producer I/O thread: ", e); > } > } > // Abort the transaction if any commit or abort didn't go through the > transaction manager's queue > while (!forceClose && transactionManager != null && > transactionManager.hasOngoingTransaction()) { > if (!transactionManager.isCompleting()) { > log.info("Aborting incomplete transaction due to shutdown"); > transactionManager.beginAbort(); > } > try { > runOnce(); > } catch (Exception e) { > log.error("Uncaught error in kafka producer I/O thread: ", e); > } > } > if (forceClose) { > // We need to fail all the incomplete transactional requests and > batches and wake up the threads waiting on > // the futures. > if (transactionManager != null) { > log.debug("Aborting incomplete transactional requests due to > forced shutdown"); > transactionManager.close(); > } > log.debug("Aborting incomplete batches due to forced shutdown"); > this.accumulator.abortIncompleteBatches(); > } > try { > this.client.close(); > } catch (Exception e) { > log.error("Failed to close network client", e); > } > log.debug("Shutdown of Kafka producer I/O thread has completed."); > } > {code} > > 2. Then KafkaThread catch uncaught exception and just log it: > {code:java} > public KafkaThread(final String name, Runnable runnable, boolean daemon) { > super(runnable, name); > configureThread(name, daemon); > } > private void configureThread(final String name, boolean daemon) { > setDaemon(daemon); > setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in > thread '{}':", name, e)); > }{code} > > To be honest, I don't understand why KafkaThread doing nothing but log it > when an uncaught exception occurs? Why not exposing method to set > setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can > determine what to do with uncaught exception, no matter thrown it or just > ignore it? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17051) ApiKeys#toHtml should exclude the APIs having unstable latest version
Chia-Ping Tsai created KAFKA-17051: -- Summary: ApiKeys#toHtml should exclude the APIs having unstable latest version Key: KAFKA-17051 URL: https://issues.apache.org/jira/browse/KAFKA-17051 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai see the discussion: https://lists.apache.org/thread/wboozsrjbsv78wxyy9orhkmrdqghxc9o The (released) docs should show only the APIs which are ready to be exposed publicly. Those APIs having make "latestVersionUnstable=true" should be excluded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17051) ApiKeys#toHtml should exclude the APIs having unstable latest version
[ https://issues.apache.org/jira/browse/KAFKA-17051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860710#comment-17860710 ] 黃竣陽 commented on KAFKA-17051: - I'm interesting in this issue, Could you assign to me? > ApiKeys#toHtml should exclude the APIs having unstable latest version > - > > Key: KAFKA-17051 > URL: https://issues.apache.org/jira/browse/KAFKA-17051 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > see the discussion: > https://lists.apache.org/thread/wboozsrjbsv78wxyy9orhkmrdqghxc9o > The (released) docs should show only the APIs which are ready to be exposed > publicly. Those APIs having make "latestVersionUnstable=true" should be > excluded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17051) ApiKeys#toHtml should exclude the APIs having unstable latest version
[ https://issues.apache.org/jira/browse/KAFKA-17051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-17051: -- Assignee: 黃竣陽 (was: Chia-Ping Tsai) > ApiKeys#toHtml should exclude the APIs having unstable latest version > - > > Key: KAFKA-17051 > URL: https://issues.apache.org/jira/browse/KAFKA-17051 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: 黃竣陽 >Priority: Minor > > see the discussion: > https://lists.apache.org/thread/wboozsrjbsv78wxyy9orhkmrdqghxc9o > The (released) docs should show only the APIs which are ready to be exposed > publicly. Those APIs having make "latestVersionUnstable=true" should be > excluded. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-17022) Fix error-prone in KafkaApis#handleFetchRequest
[ https://issues.apache.org/jira/browse/KAFKA-17022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17022. Fix Version/s: 3.9.0 Resolution: Fixed > Fix error-prone in KafkaApis#handleFetchRequest > > > Key: KAFKA-17022 > URL: https://issues.apache.org/jira/browse/KAFKA-17022 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TengYao Chi >Priority: Minor > Fix For: 3.9.0 > > > `createResponse`[0] references a variable out of scope, and so that is > error-prone since it could be not initialized when executing. We should do a > bit refactor to add `unconvertedFetchResponse` to `createResponse. > [0] > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L939 -- This message was sent by Atlassian Jira (v8.20.10#820010)