[jira] [Resolved] (KAFKA-15623) Migrate remaining tests in streams module to JUnit 5

2024-06-27 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-15623.

Fix Version/s: 3.9.0
   Resolution: Fixed

> Migrate remaining tests in streams module to JUnit 5
> 
>
> Key: KAFKA-15623
> URL: https://issues.apache.org/jira/browse/KAFKA-15623
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: 黃竣陽
>Priority: Major
> Fix For: 3.9.0
>
>
> The following special case from `build.gradle` can be removed once this is 
> completed:
> {code:java}
> if (project.name == 'streams') {
>  useJUnitPlatform {
>includeTags "integration"
>includeTags "org.apache.kafka.test.IntegrationTest"
> // Both engines are needed to run JUnit 4 tests alongside JUnit 5 tests.
>// junit-vintage (JUnit 4) can be removed once the JUnit 4 migration 
> is complete.
>includeEngines "junit-vintage", "junit-jupiter"
>  }
>}  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-7339) Migrate from JUnit 4 to JUnit 5

2024-06-27 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-7339.
---
Fix Version/s: 3.9.0
   Resolution: Fixed

> Migrate from JUnit 4 to JUnit 5
> ---
>
> Key: KAFKA-7339
> URL: https://issues.apache.org/jira/browse/KAFKA-7339
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 3.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16228) Add remote log metadata flag to the dump log tool

2024-06-27 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-16228:

Fix Version/s: 3.9.0

> Add remote log metadata flag to the dump log tool
> -
>
> Key: KAFKA-16228
> URL: https://issues.apache.org/jira/browse/KAFKA-16228
> Project: Kafka
>  Issue Type: New Feature
>  Components: Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: Federico Valeri
>Assignee: Federico Valeri
>Priority: Major
>  Labels: kip-required
> Fix For: 3.9.0
>
>
> It would be good to improve the kafka-dump-log.sh tool adding a decode flag 
> for __remote_log_metadata records. Something like the following would be 
> useful for debugging.
> {code}
> bin/kafka-dump-log.sh --remote-log-metadata-decoder --files 
> /opt/kafka/data/__remote_log_metadata-0/.log 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17045) Move MetadataLogConfig from kafka to kafka.raft

2024-06-27 Thread XiDuo You (Jira)
XiDuo You created KAFKA-17045:
-

 Summary: Move MetadataLogConfig from kafka to kafka.raft
 Key: KAFKA-17045
 URL: https://issues.apache.org/jira/browse/KAFKA-17045
 Project: Kafka
  Issue Type: Improvement
Reporter: XiDuo You


The MetadataLogConfig belongs to raft, move file to raft to mach the package 
name.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17046) Upgrade netty version to 4.1.111.Final

2024-06-27 Thread XiDuo You (Jira)
XiDuo You created KAFKA-17046:
-

 Summary: Upgrade netty version to 4.1.111.Final
 Key: KAFKA-17046
 URL: https://issues.apache.org/jira/browse/KAFKA-17046
 Project: Kafka
  Issue Type: Improvement
Reporter: XiDuo You






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-7342) Migrate streams modules to JUnit 5

2024-06-27 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860389#comment-17860389
 ] 

Mickael Maison commented on KAFKA-7342:
---

It looks like this has been done in 
https://issues.apache.org/jira/browse/KAFKA-15623. Marking as duplicate and 
closing.

> Migrate streams modules to JUnit 5
> --
>
> Key: KAFKA-7342
> URL: https://issues.apache.org/jira/browse/KAFKA-7342
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Ismael Juma
>Assignee: Christo Lolov
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-7342) Migrate streams modules to JUnit 5

2024-06-27 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-7342.
---
Resolution: Duplicate

> Migrate streams modules to JUnit 5
> --
>
> Key: KAFKA-7342
> URL: https://issues.apache.org/jira/browse/KAFKA-7342
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Ismael Juma
>Assignee: Christo Lolov
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14109) Clean up JUnit 4 test infrastructure

2024-06-27 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-14109.

Resolution: Duplicate

> Clean up JUnit 4 test infrastructure
> 
>
> Key: KAFKA-14109
> URL: https://issues.apache.org/jira/browse/KAFKA-14109
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> We need to cleanup the setup in 
> https://issues.apache.org/jira/browse/KAFKA-14108 once the JUnit 4 to JUnit 5 
> migration is complete.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14218) replace temp file handler with JUnit 5 Temporary Directory Support

2024-06-27 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14218?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860390#comment-17860390
 ] 

Mickael Maison commented on KAFKA-14218:


[~stillya] All tests have been updated to JUnit 5 now. Can you update your PR? 
Thanks

> replace temp file handler with JUnit 5 Temporary Directory Support
> --
>
> Key: KAFKA-14218
> URL: https://issues.apache.org/jira/browse/KAFKA-14218
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Luke Chen
>Assignee: Ganesh Sahu
>Priority: Major
>  Labels: Newbie, newbie
>
> We created many temp files in tests, and sometimes we forgot to delete them 
> after usage. Instead of polluting @AfterEach for each test, we should 
> consider to use JUnit 5 TempDirectory Extension.
>  
> REF: 1. [https://github.com/apache/kafka/pull/12591#issuecomment-1243001431]
> 2. [https://www.baeldung.com/junit-5-temporary-directory]
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-10413:
---
Attachment: rebalance.sh

> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Assignee: rameshkrishnan muthusamy
>Priority: Major
> Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
> Attachments: connect_worker_balanced.png, rebalance.sh
>
>
> GHi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860408#comment-17860408
 ] 

yazgoo commented on KAFKA-10413:


Hello, I launch the attached script :

[^rebalance.sh]

And in one of my test I get onne connect well balanced
{code:java}
 ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     15     "worker_id": "k2:8082"
     15     "worker_id": "k3:8083"
     15     "worker_id": "k4:8084"
     15     "worker_id": "k5:8085"
     15     "worker_id": "k6:8086"
     15     "worker_id": "k7:8087"
     15     "worker_id": "k8:8088"
     15     "worker_id": "k9:8089"
{code}
 


And the other one unbalanced

 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     27     "worker_id": "k1:8081"
     11     "worker_id": "k2:8082"
     12     "worker_id": "k3:8083"
     11     "worker_id": "k4:8084"
     12     "worker_id": "k5:8085"
     12     "worker_id": "k6:8086"
     11     "worker_id": "k7:8087"
     12     "worker_id": "k8:8088"
     12     "worker_id": "k9:8089"
{code}
 

Regards

> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Assignee: rameshkrishnan muthusamy
>Priority: Major
> Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
> Attachments: connect_worker_balanced.png, rebalance.sh
>
>
> GHi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16754) Implement release acquired records functionality in SharePartition

2024-06-27 Thread Abhinav Dixit (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhinav Dixit resolved KAFKA-16754.
---
Fix Version/s: 4.0.0
   3.9.0
   Resolution: Fixed

> Implement release acquired records functionality in SharePartition
> --
>
> Key: KAFKA-16754
> URL: https://issues.apache.org/jira/browse/KAFKA-16754
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Abhinav Dixit
>Priority: Major
> Fix For: 4.0.0, 3.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16822) Abstract consumer group in coordinator to share functionality with share group

2024-06-27 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16822.
-
Fix Version/s: 3.9.0
   Resolution: Fixed

> Abstract consumer group in coordinator to share functionality with share group
> --
>
> Key: KAFKA-16822
> URL: https://issues.apache.org/jira/browse/KAFKA-16822
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Apoorv Mittal
>Assignee: Apoorv Mittal
>Priority: Major
> Fix For: 3.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17046) Upgrade netty version to 4.1.111.Final

2024-06-27 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-17046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860474#comment-17860474
 ] 

黃竣陽 commented on KAFKA-17046:
-

Im intersting in this issue, Could you assign to me

> Upgrade netty version to 4.1.111.Final
> --
>
> Key: KAFKA-17046
> URL: https://issues.apache.org/jira/browse/KAFKA-17046
> Project: Kafka
>  Issue Type: Improvement
>Reporter: XiDuo You
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12911) Configure automatic formatter for org.apache.kafka.streams.processor

2024-06-27 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12911.

Resolution: Duplicate

> Configure automatic formatter for org.apache.kafka.streams.processor
> 
>
> Key: KAFKA-12911
> URL: https://issues.apache.org/jira/browse/KAFKA-12911
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
>
> As an incremental approach to introduce automatic code formatter, we will 
> configure automatic formatter for org.apache.kafka.streams.processor package 
> with 127 (main) + 78 (test) = 205 files.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12910) Configure automatic formatter for org.apache.kafka.streams.state

2024-06-27 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-12910.

Resolution: Duplicate

> Configure automatic formatter for org.apache.kafka.streams.state
> 
>
> Key: KAFKA-12910
> URL: https://issues.apache.org/jira/browse/KAFKA-12910
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongjin Lee
>Assignee: Dongjin Lee
>Priority: Major
>
> As of 48379bd6e5, there are 893 java files in streams module.
> As an incremental approach to introduce automatic code formatter, we will 
> configure automatic formatter for org.apache.kafka.streams.processor package 
> with 147 (main) + 91 (test) = 238 files 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10787) Introduce an import order in Java sources

2024-06-27 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10787.

Resolution: Fixed

> Introduce an import order in Java sources
> -
>
> Key: KAFKA-10787
> URL: https://issues.apache.org/jira/browse/KAFKA-10787
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.8.0
>Reporter: Dongjin Lee
>Assignee: xuanzhang gong
>Priority: Major
> Fix For: 3.9.0
>
>
> As of present, Kafka uses a relatively strict code style for Java code, 
> except import order. For this reason, the code formatting settings of every 
> local dev environment are different from person to person, resulting in 
> countless meaningless import order changes in the PR.
> This issue aims to define and apply a 3-group import order, like the 
> following:
> 1. Project packages: {{kafka.*}}, {{org.apache.kafka.*}} 
> 2. Third Party packages: {{com.*}}, {{net.*}}, {{org.*}}
> 3. Java packages: {{java.*}}, {{javax.*}}
> Discussion Thread: 
> https://lists.apache.org/thread.html/rf6f49c845a3d48efe8a91916c8fbaddb76da17742eef06798fc5b24d%40%3Cdev.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-17046) Upgrade netty version to 4.1.111.Final

2024-06-27 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-17046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860474#comment-17860474
 ] 

黃竣陽 edited comment on KAFKA-17046 at 6/27/24 12:53 PM:
---

Im intersting in this issue, Could you assign to me?


was (Author: JIRAUSER305187):
Im intersting in this issue, Could you assign to me

> Upgrade netty version to 4.1.111.Final
> --
>
> Key: KAFKA-17046
> URL: https://issues.apache.org/jira/browse/KAFKA-17046
> Project: Kafka
>  Issue Type: Improvement
>Reporter: XiDuo You
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17046) Upgrade netty version to 4.1.111.Final

2024-06-27 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860476#comment-17860476
 ] 

Chia-Ping Tsai commented on KAFKA-17046:


It seems there is a PR already https://github.com/apache/kafka/pull/16469

> Upgrade netty version to 4.1.111.Final
> --
>
> Key: KAFKA-17046
> URL: https://issues.apache.org/jira/browse/KAFKA-17046
> Project: Kafka
>  Issue Type: Improvement
>Reporter: XiDuo You
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17046) Upgrade netty version to 4.1.111.Final

2024-06-27 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-17046:
--

Assignee: XiDuo You

> Upgrade netty version to 4.1.111.Final
> --
>
> Key: KAFKA-17046
> URL: https://issues.apache.org/jira/browse/KAFKA-17046
> Project: Kafka
>  Issue Type: Improvement
>Reporter: XiDuo You
>Assignee: XiDuo You
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17047) Refactor Consumer group and shared classes with Share to modern package

2024-06-27 Thread Apoorv Mittal (Jira)
Apoorv Mittal created KAFKA-17047:
-

 Summary: Refactor Consumer group and shared classes with Share to 
modern package
 Key: KAFKA-17047
 URL: https://issues.apache.org/jira/browse/KAFKA-17047
 Project: Kafka
  Issue Type: Sub-task
Reporter: Apoorv Mittal
Assignee: Apoorv Mittal






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-17028) FindCoordinator v6 initial implementation

2024-06-27 Thread Andrew Schofield (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Schofield resolved KAFKA-17028.
--
Fix Version/s: 3.9.0
   Resolution: Fixed

> FindCoordinator v6 initial implementation
> -
>
> Key: KAFKA-17028
> URL: https://issues.apache.org/jira/browse/KAFKA-17028
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Andrew Schofield
>Assignee: Andrew Schofield
>Priority: Major
> Fix For: 3.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860408#comment-17860408
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 1:38 PM:
-

Hello, I launch the attached script :

[^rebalance.sh]

And in my test after waiting for a few minutes, I get:

one connect well balanced
{code:java}
 ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     15     "worker_id": "k2:8082"
     15     "worker_id": "k3:8083"
     15     "worker_id": "k4:8084"
     15     "worker_id": "k5:8085"
     15     "worker_id": "k6:8086"
     15     "worker_id": "k7:8087"
     15     "worker_id": "k8:8088"
     15     "worker_id": "k9:8089"
{code}
 

And the other one unbalanced

 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     27     "worker_id": "k1:8081"
     11     "worker_id": "k2:8082"
     12     "worker_id": "k3:8083"
     11     "worker_id": "k4:8084"
     12     "worker_id": "k5:8085"
     12     "worker_id": "k6:8086"
     11     "worker_id": "k7:8087"
     12     "worker_id": "k8:8088"
     12     "worker_id": "k9:8089"
{code}
 

Regards


was (Author: yazgoo):
Hello, I launch the attached script :

[^rebalance.sh]

And in one of my test I get onne connect well balanced
{code:java}
 ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     15     "worker_id": "k2:8082"
     15     "worker_id": "k3:8083"
     15     "worker_id": "k4:8084"
     15     "worker_id": "k5:8085"
     15     "worker_id": "k6:8086"
     15     "worker_id": "k7:8087"
     15     "worker_id": "k8:8088"
     15     "worker_id": "k9:8089"
{code}
 


And the other one unbalanced

 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     27     "worker_id": "k1:8081"
     11     "worker_id": "k2:8082"
     12     "worker_id": "k3:8083"
     11     "worker_id": "k4:8084"
     12     "worker_id": "k5:8085"
     12     "worker_id": "k6:8086"
     11     "worker_id": "k7:8087"
     12     "worker_id": "k8:8088"
     12     "worker_id": "k9:8089"
{code}
 

Regards

> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Assignee: rameshkrishnan muthusamy
>Priority: Major
> Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
> Attachments: connect_worker_balanced.png, rebalance.sh
>
>
> GHi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16791) Add thread detection to RaftClusterInvocationContext/ZkClusterInvocationContext

2024-06-27 Thread PoAn Yang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860492#comment-17860492
 ] 

PoAn Yang commented on KAFKA-16791:
---

Hi [~bboyleonp], if you're not working on this issue, may I try it? Thank you.

> Add thread detection to 
> RaftClusterInvocationContext/ZkClusterInvocationContext
> ---
>
> Key: KAFKA-16791
> URL: https://issues.apache.org/jira/browse/KAFKA-16791
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: bboyleonp
>Priority: Minor
>
> -`ClusterTestExtensions` should implement `BeforeAllCallback` and 
> `AfterAllCallback` by `TestUtils.verifyNoUnexpectedThreads`-
> We can leverage `BeforeEachCallback`/`AfterEachCallback` to implement new 
> AfterEachCallback instead of `verifyNoUnexpectedThreads`. Notice the new 
> thread detection should avoid cascading failure - the thread leak should make 
> specific test case (rather than all subsequent test cases) fail



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16791) Add thread detection to RaftClusterInvocationContext/ZkClusterInvocationContext

2024-06-27 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-16791:
--

Assignee: PoAn Yang  (was: bboyleonp)

> Add thread detection to 
> RaftClusterInvocationContext/ZkClusterInvocationContext
> ---
>
> Key: KAFKA-16791
> URL: https://issues.apache.org/jira/browse/KAFKA-16791
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: PoAn Yang
>Priority: Minor
>
> -`ClusterTestExtensions` should implement `BeforeAllCallback` and 
> `AfterAllCallback` by `TestUtils.verifyNoUnexpectedThreads`-
> We can leverage `BeforeEachCallback`/`AfterEachCallback` to implement new 
> AfterEachCallback instead of `verifyNoUnexpectedThreads`. Notice the new 
> thread detection should avoid cascading failure - the thread leak should make 
> specific test case (rather than all subsequent test cases) fail



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16364) MM2 High-Resolution Offset Translation

2024-06-27 Thread M Mehrtens (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860507#comment-17860507
 ] 

M Mehrtens commented on KAFKA-16364:


This is especially relevant for cluster migration scenarios when consumer group 
offsets should be replicated with a minimum latency. Ideally, consumer groups 
could detach from the source cluster and attach to the target cluster without 
needing to process a batch of duplicate messages. This would require offset 
sync/translation for inactive consumer groups - which seems like a pretty 
reasonable addition, especially if the consumer groups were previously synced.

> MM2 High-Resolution Offset Translation
> --
>
> Key: KAFKA-16364
> URL: https://issues.apache.org/jira/browse/KAFKA-16364
> Project: Kafka
>  Issue Type: New Feature
>  Components: mirrormaker
>Reporter: Greg Harris
>Priority: Minor
>  Labels: needs-kip
>
> The current OffsetSyncStore implementation 
> [https://github.com/apache/kafka/blob/8b72a2c72f09838fdd2e7416c98d30fe876b4078/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java#L57]
>  stores a sparse index of offset syncs. This attempts to strike a balanced 
> default behavior between offset translation availability, memory usage, and 
> throughput on the offset syncs topic.
> However, this balanced default behavior is not good enough in all 
> circumstances. When precise offset translation is needed away from the end of 
> the topic, such as for consumer groups with persistent lag, offset 
> translation can be more precise. Users should have a way to configure 
> high-precision offset translation, either through additional memory usage or 
> re-reading the offset syncs topic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo commented on KAFKA-10413:


Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:


{code:java}
#!/bin/bash
set -xedkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}
launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}cleanup_docker_envlaunch_kafka
launch_minio
launch_kafka_connect 1while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
donesleep 10for i in {1..2}
do# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectorsdonelaunch_kafka_connect 2
launch_kafka_connect 3
launch_kafka_connect 4
launch_kafka_connect 5
{code}

When the scrit ends, I have two each connector one worker with connector #1 
tasks, the other one with connector #2 tasks.

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     30     "worker_id": "k2:8082"
     30     "worker_id": "k3:8083"
     30     "worker_id": "k4:8084"
     30     "worker_id": "k5:8085" {code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     48     "worker_id": "k1:8081"
     18     "worker_id": "k2:8082"
     18     "worker_id": "k3:8083"
     18     "worker_id": "k4:8084"
     18     "worker_id": "k5:8085" {code}
 

> rebalancing leads

[jira] [Commented] (KAFKA-7339) Migrate from JUnit 4 to JUnit 5

2024-06-27 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860515#comment-17860515
 ] 

Ismael Juma commented on KAFKA-7339:


Yay!

> Migrate from JUnit 4 to JUnit 5
> ---
>
> Key: KAFKA-7339
> URL: https://issues.apache.org/jira/browse/KAFKA-7339
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Chia-Ping Tsai
>Priority: Major
> Fix For: 3.9.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 2:39 PM:
-

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:
{code:java}
#!/bin/bash
set -xedkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}
launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_envlaunch_kafka
launch_minio
launch_kafka_connect 1while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
donesleep 10for i in {1..2}
do# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectorsdonelaunch_kafka_connect 2
launch_kafka_connect 3
launch_kafka_connect 4
launch_kafka_connect 5
{code}
When the scrit ends, I have two each connector one worker with connector #1 
tasks, the other one with connector #2 tasks.

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     30     "worker_id": "k2:8082"
     30     "worker_id": "k3:8083"
     30     "worker_id": "k4:8084"
     30     "worker_id": "k5:8085" {code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     48     "worker_id": "k1:8081"
     18     "worker_id": "k2:8082"
     18     "worker_id": "k3:8083"
     18     "worker_id": "k4:8084"
     18     "w

[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 2:40 PM:
-

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:




 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
launch_kafka_connect 4
launch_kafka_connect 5
{code}
 

 

When the scrit ends, I have two each connector one worker with connector #1 
tasks, the other one with connector #2 tasks.

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     30     "worker_id": "k2:8082"
     30     "worker_id": "k3:8083"
     30     "worker_id": "k4:8084"
     30     "worker_id": "k5:8085" {code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     48     "worker_id": "k1:8081"
     18     "worker_id": "k2:8082"
     18     "worker_id": "k3:8083"
     18     "worker_id": "k

[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 2:47 PM:
-

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:
{code:java}
 {code}
 

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the scrit ends, I have two each connector one worker with connector #1 
tasks, the other one with connector #2 tasks.

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     80     "worker_id": "k1:8081"
     20     "worker_id": "k2:8082"
     20     "worker_id": "k3:8083"
{code}
 

In the end, we indeed get 80 tasks on each workers, but for distribution 
reasons , I think it should be (40, 40, 4

[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 2:47 PM:
-

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the scrit ends, I have two each connector one worker with connector #1 
tasks, the other one with connector #2 tasks.

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     80     "worker_id": "k1:8081"
     20     "worker_id": "k2:8082"
     20     "worker_id": "k3:8083"
{code}
 

In the end, we indeed get 80 tasks on each workers, but for distribution 
reasons , I think it should be (40, 40, 40) for each connector

[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 2:52 PM:
-

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the scrit ends, I have two each connector one worker with connector #1 
tasks, the other one with connector #2 tasks.


{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081

[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 2:54 PM:
-

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-conne

[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 2:54 PM:
-

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-conne

[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 3:08 PM:
-

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-conne

[jira] [Created] (KAFKA-17048) Document how to use KIP-853

2024-06-27 Thread Jira
José Armando García Sancio created KAFKA-17048:
--

 Summary: Document how to use KIP-853
 Key: KAFKA-17048
 URL: https://issues.apache.org/jira/browse/KAFKA-17048
 Project: Kafka
  Issue Type: Sub-task
Reporter: José Armando García Sancio


This should include
 # Changes to the quick start
 # Operational changes in the Apache Kafka documentation
 # Recommended configuration for KIP-853
 # Commands and runbook for creating a cluster
 # Commands and metrics for monitoring a kraft cluster with KIP-853



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16781) Expose advertised.listeners in controller node

2024-06-27 Thread Gantigmaa Selenge (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860554#comment-17860554
 ] 

Gantigmaa Selenge commented on KAFKA-16781:
---

[~showuon] I think we can close this issue as it's already implemented in 
[https://github.com/apache/kafka/pull/16235] 

> Expose advertised.listeners in controller node
> --
>
> Key: KAFKA-16781
> URL: https://issues.apache.org/jira/browse/KAFKA-16781
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Gantigmaa Selenge
>Priority: Major
>  Labels: need-kip, newbie, newbie++
>
> After 
> [KIP-919|https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration],
>  we allow clients to talk to the KRaft controller node directly. But unlike 
> broker node, we don't allow users to config advertised.listeners for clients 
> to connect to. Without this config, the client cannot connect to the 
> controller node if the controller is sitting behind NAT network while the 
> client is in the external network.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)
yazgoo created KAFKA-17049:
--

 Summary: unbalanced connectors
 Key: KAFKA-17049
 URL: https://issues.apache.org/jira/browse/KAFKA-17049
 Project: Kafka
  Issue Type: Bug
Reporter: yazgoo


This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script (create two connectors on an existing 
worker, then add two workers) :


{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:jav

[jira] [Commented] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860562#comment-17860562
 ] 

yazgoo commented on KAFKA-10413:


I created https://issues.apache.org/jira/browse/KAFKA-17049

> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Assignee: rameshkrishnan muthusamy
>Priority: Major
> Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
> Attachments: connect_worker_balanced.png, rebalance.sh
>
>
> GHi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Component/s: connect

> unbalanced connectors
> -
>
> Key: KAFKA-17049
> URL: https://issues.apache.org/jira/browse/KAFKA-17049
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yazgoo
>Priority: Major
>
> This follows https://issues.apache.org/jira/browse/KAFKA-10413
> When runnning the following script (create two connectors on an existing 
> worker, then add two workers) :
> {code:java}
> #!/bin/bash
> set -xe
> dkill() {
>   docker stop "$1" || true
>   docker rm -v -f "$1" || true
> }
> write_topic() {
>   # write 200 messages to the topic
>   json='{"name": "test"}'
>   docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
> /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
> --topic test_topic$1"
> }
> launch_minio() {
>   # Launch Minio (Fake S3)
>   docker run --network host -d --name minio \
>     -e MINIO_ROOT_USER=minioadmin \
>     -e MINIO_ROOT_PASSWORD=minioadmin \
>     minio/minio server --console-address :9001 /data
>       docker exec -it minio mkdir /data/my-minio-bucket
> }
> launch_kafka_connect() {
>   # Start Kafka Connect with S3 Connector
>   docker run --network host -d --name "kafka-connect$1" \
>     -e  AWS_ACCESS_KEY_ID=minioadmin \
>     -e  AWS_SECRET_ACCESS_KEY=minioadmin \
>     -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
>     -e  CONNECT_LISTENERS="http://localhost:808$1"; \
>     -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
>     -e  CONNECT_REST_PORT="808$1" \
>     -e  CONNECT_GROUP_ID="connect-cluster" \
>     -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
>     -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
>     -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
>     -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" 
> \
>     -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  
> CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
>  \
>     -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
>     --entrypoint bash \
>     confluentinc/cp-kafka-connect:7.6.1 \
>     -c "confluent-hub install --no-prompt 
> confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run"
> }
> cleanup_docker_env() {
>   docker volume prune -f
>   for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
> minio
>   do
>     dkill "$container"
>   done
> }
> launch_kafka() {
>   docker run --network host --hostname localhost --ulimit nofile=65536:65536 
> -d --name kafka -p 9092:9092 apache/kafka
>   for i in {1..2}
>   do
>     # Create a Kafka topic
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 
> --topic "test_topic$i"
>     write_topic "$i"
>   done
>   for topic in connect-configs connect-offsets connect-status
>   do
>     # with cleanup.policy=compact, we can't have more than 1 partition
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
> $topic --config cleanup.policy=compact
>   done
> }
> cleanup_docker_env
> launch_kafka
> launch_minio
> launch_kafka_connect 1
> while true
> do
>   sleep 5
>   # Check if Kafka Connect is up
>   curl http://localhost:8081/ || continue
>   break
> done
> sleep 10
> for i in {1..2}
> do
> # Set up a connector
> curl -X POST -H "Content-Type: application/json" --data '{
>   "name": "s3-connector'"$i"'",
>   "config": {
>     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
>     "tasks.max": "120",
>     "topics": "test_topic'"$i"'",
>     "s3.region": "us-east-1",
>     "store.url": "http://0.0.0.0:9000";,
>     "s3.bucket.name": "my-minio-bucket",
>     "s3.part.size": "5242880",
>     "flush.size": "3",
>     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
>     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
>     "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
>     "schema.compatibility": "NONE"
>   }
> }' http://localhost:8081/connectors
> done
> launch_kafka_connect 2
> launch_kafka_connect 3
> {code}
>  
>  
> When the script ends, I have one worker with connector #1 tasks, the other 
> one with connector #2 tasks.
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
> |grep worker_id | sort | uniq

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. add two more workers


{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|gre

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. add two more workers
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep 

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers



 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 --topic 
"test_topic$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "12",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     6     "worker_id": "k2:8082"
     6     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     8     "worker_id": "k1:8081"
     2     "worker_id": "k2:8082"
     2     "worker_id": "k3:8083"
{code}
 

In the end, we indeed get 8 tasks on each workers, but for distribution reasons 
, I think it should be (4, 4, 4) for each connector, bec

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers



 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 --topic 
"test_topic$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "12",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     6     "worker_id": "k2:8082"
     6     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     8     "worker_id": "k1:8081"
     2     "worker_id": "k2:8082"
     2     "worker_id": "k3:8083"
{code}
 

In the end, we indeed get 8 tasks on each workers, but for distribution reasons 
, I think it should be (4, 4, 4) for each connector, bec

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 --topic 
"test_topic$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "12",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker taking all the connectors/tasks:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     6     "worker_id": "k2:8082"
     6     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     8     "worker_id": "k1:8081"
     2     "worker_id": "k2:8082"
     2     "worker_id": "k3:8083"
{code}
 

In the end, we indeed get 8 tasks on each workers, but for distribution reasons 
, I think it should be (4, 4, 4) for each connector, because all connectors 
don't do the 

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 --topic 
"test_topic$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "12",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have the first worker taking all the connectors/tasks:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"
{code}
 

Then I wait a few minutes,

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     6     "worker_id": "k2:8082"
     6     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     8     "worker_id": "k1:8081"
     2     "worker_id": "k2:8082"
     2     "worker_id": "k3:8083"
{code}
 

In the end, we indeed get 8 tasks on each workers, but for distribution reasons 
, I think it should be (4, 4, 4) for each connector, because all connectors 
do

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 --topic 
"test_topic$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "12",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have the first worker taking all the connectors/tasks:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     6     "worker_id": "k2:8082"
     6     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     8     "worker_id": "k1:8081"
     2     "worker_id": "k2:8082"
     2     "worker_id": "k3:8083"
{code}
 

In the end, we indeed get 8 tasks on each workers, but for distribution reasons 
, I think it should be (4, 4, 4) for each connector, because all connectors 
don't d

[jira] [Updated] (KAFKA-16228) Add remote log metadata flag to the dump log tool

2024-06-27 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-16228:

Description: 
It would be good to improve the kafka-dump-log.sh tool adding a decode flag for 
__remote_log_metadata records.

{code}
bin/kafka-dump-log.sh --remote-log-metadata-decoder --files 
/opt/kafka/data/__remote_log_metadata-0/.log 
{code}

  was:
It would be good to improve the kafka-dump-log.sh tool adding a decode flag for 
__remote_log_metadata records. Something like the following would be useful for 
debugging.

{code}
bin/kafka-dump-log.sh --remote-log-metadata-decoder --files 
/opt/kafka/data/__remote_log_metadata-0/.log 
{code}


> Add remote log metadata flag to the dump log tool
> -
>
> Key: KAFKA-16228
> URL: https://issues.apache.org/jira/browse/KAFKA-16228
> Project: Kafka
>  Issue Type: New Feature
>  Components: Tiered-Storage
>Affects Versions: 3.6.1
>Reporter: Federico Valeri
>Assignee: Federico Valeri
>Priority: Major
>  Labels: kip-required
> Fix For: 3.9.0
>
>
> It would be good to improve the kafka-dump-log.sh tool adding a decode flag 
> for __remote_log_metadata records.
> {code}
> bin/kafka-dump-log.sh --remote-log-metadata-decoder --files 
> /opt/kafka/data/__remote_log_metadata-0/.log 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860586#comment-17860586
 ] 

Greg Harris commented on KAFKA-17049:
-

Thank you for providing a detailed reproduction case. I wrote this unit test in 
IncrementalCooperativeAssignorTest that displays the same behavior:
{noformat}
@Test
public void checkIndividualConnectorBalance() {
connectors.clear();
addNewConnector("connector1", 12);
performStandardRebalance();
addNewConnector("connector2", 12);
performStandardRebalance();
addNewEmptyWorkers("worker2");
performStandardRebalance();
performStandardRebalance();
addNewEmptyWorkers("worker3");
performStandardRebalance();
performStandardRebalance();
}{noformat}

The resulting assignment is:
{noformat}
"worker1" -> {WorkerCoordinator$ConnectorsAndTasks@4287} "{ 
connectorIds=[connector2], taskIds=[connector2-4, connector2-5, connector2-6, 
connector2-7, connector2-8, connector2-9, connector2-10, connector2-11]}"
"worker2" -> {WorkerCoordinator$ConnectorsAndTasks@4289} "{ 
connectorIds=[connector1], taskIds=[connector1-4, connector1-5, connector1-6, 
connector1-7, connector1-8, connector1-9, connector1-10, connector1-11]}"
"worker3" -> {WorkerCoordinator$ConnectorsAndTasks@4291} "{ connectorIds=[], 
taskIds=[connector1-0, connector1-1, connector1-2, connector1-3, connector2-0, 
connector2-1, connector2-2, connector2-3]}"{noformat}
I think the root cause is that the order in which load-balancing revocations 
take place is defined by the iteration order of this set, rather than some more 
intentional strategy.

[https://github.com/apache/kafka/blob/3ebad6349de7d121a31f9d47c5ede7d6bbfac4d1/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L739-L754]
 

[~yazgoo] Are you interested in working on this? I think this will require some 
changes to which revocations the assignor generates.

> unbalanced connectors
> -
>
> Key: KAFKA-17049
> URL: https://issues.apache.org/jira/browse/KAFKA-17049
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yazgoo
>Priority: Major
>
> This follows https://issues.apache.org/jira/browse/KAFKA-10413
> When runnning the following script, which
> 1. runs one worker
> 2. declares two connectors
> 3. adds two more workers
>  
> {code:java}
> #!/bin/bash
> set -xe
> dkill() {
>   docker stop "$1" || true
>   docker rm -v -f "$1" || true
> }
> launch_minio() {
>   # Launch Minio (Fake S3)
>   docker run --network host -d --name minio \
>     -e MINIO_ROOT_USER=minioadmin \
>     -e MINIO_ROOT_PASSWORD=minioadmin \
>     minio/minio server --console-address :9001 /data
>       docker exec -it minio mkdir /data/my-minio-bucket
> }
> launch_kafka_connect() {
>   # Start Kafka Connect with S3 Connector
>   docker run --network host -d --name "kafka-connect$1" \
>     -e  AWS_ACCESS_KEY_ID=minioadmin \
>     -e  AWS_SECRET_ACCESS_KEY=minioadmin \
>     -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
>     -e  CONNECT_LISTENERS="http://localhost:808$1"; \
>     -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
>     -e  CONNECT_REST_PORT="808$1" \
>     -e  CONNECT_GROUP_ID="connect-cluster" \
>     -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
>     -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
>     -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
>     -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" 
> \
>     -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  
> CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
>  \
>     -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
>     --entrypoint bash \
>     confluentinc/cp-kafka-connect:7.6.1 \
>     -c "confluent-hub install --no-prompt 
> confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run"
> }
> cleanup_docker_env() {
>   docker volume prune -f
>   for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
> minio
>   do
>     dkill "$container"
>   done
> }
> launch_kafka() {
>   docker run --network host --hostname localhost --ulimit nofile=65536:65536 
> -d --name kafka -p 9092:9092 apache/kafka
>   for i in {1..2}
>   do
>     # Create a Kafka topic
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 
> --topic "test_topic$i"
>   done
>   for topic in connect-configs connect-offsets connect-status
>   do
>     # with cleanup.polic

[jira] [Assigned] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-17049:
---

Assignee: Greg Harris

> unbalanced connectors
> -
>
> Key: KAFKA-17049
> URL: https://issues.apache.org/jira/browse/KAFKA-17049
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yazgoo
>Assignee: Greg Harris
>Priority: Major
>
> This follows https://issues.apache.org/jira/browse/KAFKA-10413
> When runnning the following script, which
> 1. runs one worker
> 2. declares two connectors
> 3. adds two more workers
>  
> {code:java}
> #!/bin/bash
> set -xe
> dkill() {
>   docker stop "$1" || true
>   docker rm -v -f "$1" || true
> }
> launch_minio() {
>   # Launch Minio (Fake S3)
>   docker run --network host -d --name minio \
>     -e MINIO_ROOT_USER=minioadmin \
>     -e MINIO_ROOT_PASSWORD=minioadmin \
>     minio/minio server --console-address :9001 /data
>       docker exec -it minio mkdir /data/my-minio-bucket
> }
> launch_kafka_connect() {
>   # Start Kafka Connect with S3 Connector
>   docker run --network host -d --name "kafka-connect$1" \
>     -e  AWS_ACCESS_KEY_ID=minioadmin \
>     -e  AWS_SECRET_ACCESS_KEY=minioadmin \
>     -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
>     -e  CONNECT_LISTENERS="http://localhost:808$1"; \
>     -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
>     -e  CONNECT_REST_PORT="808$1" \
>     -e  CONNECT_GROUP_ID="connect-cluster" \
>     -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
>     -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
>     -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
>     -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" 
> \
>     -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  
> CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
>  \
>     -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
>     --entrypoint bash \
>     confluentinc/cp-kafka-connect:7.6.1 \
>     -c "confluent-hub install --no-prompt 
> confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run"
> }
> cleanup_docker_env() {
>   docker volume prune -f
>   for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
> minio
>   do
>     dkill "$container"
>   done
> }
> launch_kafka() {
>   docker run --network host --hostname localhost --ulimit nofile=65536:65536 
> -d --name kafka -p 9092:9092 apache/kafka
>   for i in {1..2}
>   do
>     # Create a Kafka topic
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 
> --topic "test_topic$i"
>   done
>   for topic in connect-configs connect-offsets connect-status
>   do
>     # with cleanup.policy=compact, we can't have more than 1 partition
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
> $topic --config cleanup.policy=compact
>   done
> }
> cleanup_docker_env
> launch_kafka
> launch_minio
> launch_kafka_connect 1
> while true
> do
>   sleep 5
>   # Check if Kafka Connect is up
>   curl http://localhost:8081/ || continue
>   break
> done
> sleep 10
> for i in {1..2}
> do
> # Set up a connector
> curl -X POST -H "Content-Type: application/json" --data '{
>   "name": "s3-connector'"$i"'",
>   "config": {
>     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
>     "tasks.max": "12",
>     "topics": "test_topic'"$i"'",
>     "s3.region": "us-east-1",
>     "store.url": "http://0.0.0.0:9000";,
>     "s3.bucket.name": "my-minio-bucket",
>     "s3.part.size": "5242880",
>     "flush.size": "3",
>     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
>     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
>     "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
>     "schema.compatibility": "NONE"
>   }
> }' http://localhost:8081/connectors
> done
> launch_kafka_connect 2
> launch_kafka_connect 3
> {code}
>  
>  
> When the script ends, I have the first worker taking all the connectors/tasks:
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
> |grep worker_id | sort | uniq -c
>     12     "worker_id": "k1:8081"{code}
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
> |grep worker_id | sort | uniq -c
>     12     "worker_id": "k1:8081"
> {code}
>  
> Then I wait a few minutes,
>

[jira] [Assigned] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-17049:
---

Assignee: (was: Greg Harris)

> unbalanced connectors
> -
>
> Key: KAFKA-17049
> URL: https://issues.apache.org/jira/browse/KAFKA-17049
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yazgoo
>Priority: Major
>
> This follows https://issues.apache.org/jira/browse/KAFKA-10413
> When runnning the following script, which
> 1. runs one worker
> 2. declares two connectors
> 3. adds two more workers
>  
> {code:java}
> #!/bin/bash
> set -xe
> dkill() {
>   docker stop "$1" || true
>   docker rm -v -f "$1" || true
> }
> launch_minio() {
>   # Launch Minio (Fake S3)
>   docker run --network host -d --name minio \
>     -e MINIO_ROOT_USER=minioadmin \
>     -e MINIO_ROOT_PASSWORD=minioadmin \
>     minio/minio server --console-address :9001 /data
>       docker exec -it minio mkdir /data/my-minio-bucket
> }
> launch_kafka_connect() {
>   # Start Kafka Connect with S3 Connector
>   docker run --network host -d --name "kafka-connect$1" \
>     -e  AWS_ACCESS_KEY_ID=minioadmin \
>     -e  AWS_SECRET_ACCESS_KEY=minioadmin \
>     -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
>     -e  CONNECT_LISTENERS="http://localhost:808$1"; \
>     -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
>     -e  CONNECT_REST_PORT="808$1" \
>     -e  CONNECT_GROUP_ID="connect-cluster" \
>     -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
>     -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
>     -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
>     -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" 
> \
>     -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  
> CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
>  \
>     -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
>     --entrypoint bash \
>     confluentinc/cp-kafka-connect:7.6.1 \
>     -c "confluent-hub install --no-prompt 
> confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run"
> }
> cleanup_docker_env() {
>   docker volume prune -f
>   for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
> minio
>   do
>     dkill "$container"
>   done
> }
> launch_kafka() {
>   docker run --network host --hostname localhost --ulimit nofile=65536:65536 
> -d --name kafka -p 9092:9092 apache/kafka
>   for i in {1..2}
>   do
>     # Create a Kafka topic
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 
> --topic "test_topic$i"
>   done
>   for topic in connect-configs connect-offsets connect-status
>   do
>     # with cleanup.policy=compact, we can't have more than 1 partition
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
> $topic --config cleanup.policy=compact
>   done
> }
> cleanup_docker_env
> launch_kafka
> launch_minio
> launch_kafka_connect 1
> while true
> do
>   sleep 5
>   # Check if Kafka Connect is up
>   curl http://localhost:8081/ || continue
>   break
> done
> sleep 10
> for i in {1..2}
> do
> # Set up a connector
> curl -X POST -H "Content-Type: application/json" --data '{
>   "name": "s3-connector'"$i"'",
>   "config": {
>     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
>     "tasks.max": "12",
>     "topics": "test_topic'"$i"'",
>     "s3.region": "us-east-1",
>     "store.url": "http://0.0.0.0:9000";,
>     "s3.bucket.name": "my-minio-bucket",
>     "s3.part.size": "5242880",
>     "flush.size": "3",
>     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
>     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
>     "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
>     "schema.compatibility": "NONE"
>   }
> }' http://localhost:8081/connectors
> done
> launch_kafka_connect 2
> launch_kafka_connect 3
> {code}
>  
>  
> When the script ends, I have the first worker taking all the connectors/tasks:
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
> |grep worker_id | sort | uniq -c
>     12     "worker_id": "k1:8081"{code}
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
> |grep worker_id | sort | uniq -c
>     12     "worker_id": "k1:8081"
> {code}
>  
> Then I wait a few minutes,
> And I get the final sta

[jira] [Assigned] (KAFKA-8812) Rebalance Producers

2024-06-27 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-8812:


Assignee: Bruno Cadonna

> Rebalance Producers
> ---
>
> Key: KAFKA-8812
> URL: https://issues.apache.org/jira/browse/KAFKA-8812
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.3.0
>Reporter: Werner Daehn
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: kip
>
> [KIP-509: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-509%3A+Rebalance+and+restart+Producers|https://cwiki.apache.org/confluence/display/KAFKA/KIP-509%3A+Rebalance+and+restart+Producers]
> Please bare with me. Initially this thought sounds stupid but it has its 
> merits.
>  
> How do you build a distributed producer at the moment? You use Kafka Connect 
> which in turn requires a cluster that tells which instance is producing what 
> partitions.
> On the consumer side it is different. There Kafka itself does the data 
> distribution. If you have 10 Kafka partitions and 10 consumers, each will get 
> data for one partition. With 5 consumers, each will get data from two 
> partitions. And if there is only a single consumer active, it gets all data. 
> All is managed by Kafka, all you have to do is start as many consumers as you 
> want.
>  
> I'd like to suggest something similar for the producers. A producer would 
> tell Kafka that its source has 10 partitions. The Kafka server then responds 
> with a list of partitions this instance shall be responsible for. If it is 
> the only producer, the response would be all 10 partitions. If it is the 
> second instance starting up, the first instance would get the information it 
> should produce data for partition 1-5 and the new one for partition 6-10. If 
> the producer fails to respond with an alive packet, a rebalance does happen, 
> informing the active producer to take more load and the dead producer will 
> get an error when sending data again.
> For restart, the producer rebalance has to send the starting point where to 
> start producing the data onwards from as well, of course. Would be best if 
> this is a user generated pointer and not the topic offset. Then it can be 
> e.g. the database system change number, a database transaction id or 
> something similar.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-8812) Rebalance Producers

2024-06-27 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8812?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-8812:


Assignee: (was: Bruno Cadonna)

> Rebalance Producers
> ---
>
> Key: KAFKA-8812
> URL: https://issues.apache.org/jira/browse/KAFKA-8812
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.3.0
>Reporter: Werner Daehn
>Priority: Major
>  Labels: kip
>
> [KIP-509: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-509%3A+Rebalance+and+restart+Producers|https://cwiki.apache.org/confluence/display/KAFKA/KIP-509%3A+Rebalance+and+restart+Producers]
> Please bare with me. Initially this thought sounds stupid but it has its 
> merits.
>  
> How do you build a distributed producer at the moment? You use Kafka Connect 
> which in turn requires a cluster that tells which instance is producing what 
> partitions.
> On the consumer side it is different. There Kafka itself does the data 
> distribution. If you have 10 Kafka partitions and 10 consumers, each will get 
> data for one partition. With 5 consumers, each will get data from two 
> partitions. And if there is only a single consumer active, it gets all data. 
> All is managed by Kafka, all you have to do is start as many consumers as you 
> want.
>  
> I'd like to suggest something similar for the producers. A producer would 
> tell Kafka that its source has 10 partitions. The Kafka server then responds 
> with a list of partitions this instance shall be responsible for. If it is 
> the only producer, the response would be all 10 partitions. If it is the 
> second instance starting up, the first instance would get the information it 
> should produce data for partition 1-5 and the new one for partition 6-10. If 
> the producer fails to respond with an alive packet, a rebalance does happen, 
> informing the active producer to take more load and the dead producer will 
> get an error when sending data again.
> For restart, the producer rebalance has to send the starting point where to 
> start producing the data onwards from as well, of course. Would be best if 
> this is a user generated pointer and not the topic offset. Then it can be 
> e.g. the database system change number, a database transaction id or 
> something similar.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13574) NotLeaderOrFollowerException thrown for a successful send

2024-06-27 Thread Laurenceau Julien (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860637#comment-17860637
 ] 

Laurenceau Julien commented on KAFKA-13574:
---

We are 2 years later this bug report that assess exactly once processing is 
broken.

I see some ideas, but I see no beginning of a solution. Is there any workaround 
or fix ? 

Do you guys think that this is not important ?

Maybe some warning notice should be added on the documentation, because people 
choosing to pay the price of exactly-once generally care a lot about 
consistency !

> NotLeaderOrFollowerException thrown for a successful send
> -
>
> Key: KAFKA-13574
> URL: https://issues.apache.org/jira/browse/KAFKA-13574
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
> Environment: openjdk version "11.0.13" 2021-10-19
>Reporter: Kyle Kingsbury
>Priority: Minor
>  Labels: error-handling
>
> With org.apache.kafka/kafka-clients 3.0.0, under rare circumstances involving 
> multiple node and network failures, I've observed a call to `producer.send()` 
> throw `NotLeaderOrFollowerException` for a message which later appears in 
> `consumer.poll()` return values.
> I don't have a reliable repro case for this yet, but the case I hit involved 
> retries=1000, acks=all, and idempotence enabled. I suspect what might be 
> happening here is that an initial attempt to send the message makes it to the 
> server and is committed, but the acknowledgement is lost e.g. due to timeout; 
> the Kafka producer then automatically retries the send attempt, and on that 
> retry hits a NotLeaderOrFollowerException, which is thrown back to the 
> caller. If we interpret NotLeaderOrFollowerException as a definite failure, 
> then this would constitute an aborted read.
> I've seen issues like this in a number of databases around client or 
> server-internal retry mechanisms, and I think the thing to do is: rather than 
> throwing the most *recent* error, throw the {*}most indefinite{*}. That way 
> clients know that their request may have actually succeeded, and they won't 
> (e.g.) attempt to re-submit a non-idempotent request again.
> As a side note: is there... perhaps documentation on which errors in Kafka 
> are supposed to be definite vs indefinite? NotLeaderOrFollowerException is a 
> subclass of RetriableException, but it looks like RetriableException is more 
> about transient vs permanent errors than whether it's safe to retry.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860647#comment-17860647
 ] 

yazgoo commented on KAFKA-17049:


Thanks [~gharris1727] ,

I'll try and have a look at it !

> unbalanced connectors
> -
>
> Key: KAFKA-17049
> URL: https://issues.apache.org/jira/browse/KAFKA-17049
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yazgoo
>Priority: Major
>
> This follows https://issues.apache.org/jira/browse/KAFKA-10413
> When runnning the following script, which
> 1. runs one worker
> 2. declares two connectors
> 3. adds two more workers
>  
> {code:java}
> #!/bin/bash
> set -xe
> dkill() {
>   docker stop "$1" || true
>   docker rm -v -f "$1" || true
> }
> launch_minio() {
>   # Launch Minio (Fake S3)
>   docker run --network host -d --name minio \
>     -e MINIO_ROOT_USER=minioadmin \
>     -e MINIO_ROOT_PASSWORD=minioadmin \
>     minio/minio server --console-address :9001 /data
>       docker exec -it minio mkdir /data/my-minio-bucket
> }
> launch_kafka_connect() {
>   # Start Kafka Connect with S3 Connector
>   docker run --network host -d --name "kafka-connect$1" \
>     -e  AWS_ACCESS_KEY_ID=minioadmin \
>     -e  AWS_SECRET_ACCESS_KEY=minioadmin \
>     -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
>     -e  CONNECT_LISTENERS="http://localhost:808$1"; \
>     -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
>     -e  CONNECT_REST_PORT="808$1" \
>     -e  CONNECT_GROUP_ID="connect-cluster" \
>     -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
>     -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
>     -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
>     -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" 
> \
>     -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  
> CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
>  \
>     -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
>     --entrypoint bash \
>     confluentinc/cp-kafka-connect:7.6.1 \
>     -c "confluent-hub install --no-prompt 
> confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run"
> }
> cleanup_docker_env() {
>   docker volume prune -f
>   for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
> minio
>   do
>     dkill "$container"
>   done
> }
> launch_kafka() {
>   docker run --network host --hostname localhost --ulimit nofile=65536:65536 
> -d --name kafka -p 9092:9092 apache/kafka
>   for i in {1..2}
>   do
>     # Create a Kafka topic
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 
> --topic "test_topic$i"
>   done
>   for topic in connect-configs connect-offsets connect-status
>   do
>     # with cleanup.policy=compact, we can't have more than 1 partition
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
> $topic --config cleanup.policy=compact
>   done
> }
> cleanup_docker_env
> launch_kafka
> launch_minio
> launch_kafka_connect 1
> while true
> do
>   sleep 5
>   # Check if Kafka Connect is up
>   curl http://localhost:8081/ || continue
>   break
> done
> sleep 10
> for i in {1..2}
> do
> # Set up a connector
> curl -X POST -H "Content-Type: application/json" --data '{
>   "name": "s3-connector'"$i"'",
>   "config": {
>     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
>     "tasks.max": "12",
>     "topics": "test_topic'"$i"'",
>     "s3.region": "us-east-1",
>     "store.url": "http://0.0.0.0:9000";,
>     "s3.bucket.name": "my-minio-bucket",
>     "s3.part.size": "5242880",
>     "flush.size": "3",
>     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
>     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
>     "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
>     "schema.compatibility": "NONE"
>   }
> }' http://localhost:8081/connectors
> done
> launch_kafka_connect 2
> launch_kafka_connect 3
> {code}
>  
>  
> When the script ends, I have the first worker taking all the connectors/tasks:
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
> |grep worker_id | sort | uniq -c
>     12     "worker_id": "k1:8081"{code}
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
> |grep worker_id | sort | uniq -c
>     12     "worker_id": "k1:8081"
> {co

[jira] [Created] (KAFKA-17050) Revert group.version for 3.8 and 3.9

2024-06-27 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-17050:
--

 Summary: Revert group.version for 3.8 and 3.9
 Key: KAFKA-17050
 URL: https://issues.apache.org/jira/browse/KAFKA-17050
 Project: Kafka
  Issue Type: Task
Affects Versions: 3.8.0, 3.9.0
Reporter: Justine Olshan
Assignee: Justine Olshan


After much discussion for KAFKA-17011, we decided it would be best for 3.8 to 
just remove the group version feature for 3.8. 

As for 3.9, [~dajac] said it would be easier for EA users of the group 
coordinator to have a single way to configure. For 4.0 we can reintroduce it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17011) SupportedFeatures.MinVersion incorrectly blocks v0

2024-06-27 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-17011:
---
Priority: Critical  (was: Blocker)

> SupportedFeatures.MinVersion incorrectly blocks v0
> --
>
> Key: KAFKA-17011
> URL: https://issues.apache.org/jira/browse/KAFKA-17011
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.8.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Critical
> Fix For: 3.8.0
>
>
> SupportedFeatures.MinVersion incorrectly blocks v0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16986) After upgrading to Kafka 3.4.1, the producer constantly produces logs related to topicId changes

2024-06-27 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860662#comment-17860662
 ] 

Justine Olshan commented on KAFKA-16986:


[~viniciusxyz] just curious – this is a ZK cluster I assume since the upgrade 
was from an earlier version? And I'm curious if we have metadata responses for 
these producers (request logging) 

I am also looking at a few more avenues on my end. 

It looks like somehow the topic ID is being removed from the producer's 
metadata cache so it looks like the topic ID in a metadata response is the 
first instance of the topic ID. We included this so in the upgrade from > 2.7 
-> < 2.8 we would do the epoch reset correctly. It shouldn't trigger as often 
as your logs show though. Checking the client code to see if there was some 
assumption made about retaining this ID. 

> After upgrading to Kafka 3.4.1, the producer constantly produces logs related 
> to topicId changes
> 
>
> Key: KAFKA-16986
> URL: https://issues.apache.org/jira/browse/KAFKA-16986
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 3.0.1, 3.6.1
>Reporter: Vinicius Vieira dos Santos
>Priority: Minor
> Attachments: image.png
>
>
> When updating the Kafka broker from version 2.7.0 to 3.4.1, we noticed that 
> the applications began to log the message "{*}Resetting the last seen epoch 
> of partition PAYMENTS-0 to 0 since the associated topicId changed from null 
> to szRLmiAiTs8Y0nI8b3Wz1Q{*}" in a very constant, from what I understand this 
> behavior is not expected because the topic was not deleted and recreated so 
> it should simply use the cached data and not go through this client log line.
> We have some applications with around 15 topics and 40 partitions which means 
> around 600 log lines when metadata updates occur
> The main thing for me is to know if this could indicate a problem or if I can 
> simply change the log level of the org.apache.kafka.clients.Metadata class to 
> warn without worries
>  
> There are other reports of the same behavior like this:  
> [https://stackoverflow.com/questions/74652231/apache-kafka-resetting-the-last-seen-epoch-of-partition-why]
>  
> *Some log occurrences over an interval of about 7 hours, each block refers to 
> an instance of the application in kubernetes*
>  
> !image.png!
> *My scenario:*
> *Application:*
>  - Java: 21
>  - Client: 3.6.1, also tested on 3.0.1 and has the same behavior
> *Broker:*
>  - Cluster running on Kubernetes with the bitnami/kafka:3.4.1-debian-11-r52 
> image
>  
> *Producer Config*
>  
>     acks = -1
>     auto.include.jmx.reporter = true
>     batch.size = 16384
>     bootstrap.servers = [server:9092]
>     buffer.memory = 33554432
>     client.dns.lookup = use_all_dns_ips
>     client.id = producer-1
>     compression.type = gzip
>     connections.max.idle.ms = 54
>     delivery.timeout.ms = 3
>     enable.idempotence = true
>     interceptor.classes = []
>     key.serializer = class 
> org.apache.kafka.common.serialization.ByteArraySerializer
>     linger.ms = 0
>     max.block.ms = 6
>     max.in.flight.requests.per.connection = 1
>     max.request.size = 1048576
>     metadata.max.age.ms = 30
>     metadata.max.idle.ms = 30
>     metric.reporters = []
>     metrics.num.samples = 2
>     metrics.recording.level = INFO
>     metrics.sample.window.ms = 3
>     partitioner.adaptive.partitioning.enable = true
>     partitioner.availability.timeout.ms = 0
>     partitioner.class = null
>     partitioner.ignore.keys = false
>     receive.buffer.bytes = 32768
>     reconnect.backoff.max.ms = 1000
>     reconnect.backoff.ms = 50
>     request.timeout.ms = 3
>     retries = 3
>     retry.backoff.ms = 100
>     sasl.client.callback.handler.class = null
>     sasl.jaas.config = [hidden]
>     sasl.kerberos.kinit.cmd = /usr/bin/kinit
>     sasl.kerberos.min.time.before.relogin = 6
>     sasl.kerberos.service.name = null
>     sasl.kerberos.ticket.renew.jitter = 0.05
>     sasl.kerberos.ticket.renew.window.factor = 0.8
>     sasl.login.callback.handler.class = null
>     sasl.login.class = null
>     sasl.login.connect.timeout.ms = null
>     sasl.login.read.timeout.ms = null
>     sasl.login.refresh.buffer.seconds = 300
>     sasl.login.refresh.min.period.seconds = 60
>     sasl.login.refresh.window.factor = 0.8
>     sasl.login.refresh.window.jitter = 0.05
>     sasl.login.retry.backoff.max.ms = 1
>     sasl.login.retry.backoff.ms = 100
>     sasl.mechanism = PLAIN
>     sasl.oauthbearer.clock.skew.seconds = 30
>     sasl.oauthbearer.expected.audience = null
>     sasl.oauthbearer.expected.issuer = null
>     sasl.oauthbearer.jwks.endpoint.refresh.ms = 36000

[jira] [Updated] (KAFKA-17049) Incremental rebalances assign too many tasks for the same connector together

2024-06-27 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-17049:

Summary: Incremental rebalances assign too many tasks for the same 
connector together  (was: unbalanced connectors)

> Incremental rebalances assign too many tasks for the same connector together
> 
>
> Key: KAFKA-17049
> URL: https://issues.apache.org/jira/browse/KAFKA-17049
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yazgoo
>Priority: Major
>
> This follows https://issues.apache.org/jira/browse/KAFKA-10413
> When runnning the following script, which
> 1. runs one worker
> 2. declares two connectors
> 3. adds two more workers
>  
> {code:java}
> #!/bin/bash
> set -xe
> dkill() {
>   docker stop "$1" || true
>   docker rm -v -f "$1" || true
> }
> launch_minio() {
>   # Launch Minio (Fake S3)
>   docker run --network host -d --name minio \
>     -e MINIO_ROOT_USER=minioadmin \
>     -e MINIO_ROOT_PASSWORD=minioadmin \
>     minio/minio server --console-address :9001 /data
>       docker exec -it minio mkdir /data/my-minio-bucket
> }
> launch_kafka_connect() {
>   # Start Kafka Connect with S3 Connector
>   docker run --network host -d --name "kafka-connect$1" \
>     -e  AWS_ACCESS_KEY_ID=minioadmin \
>     -e  AWS_SECRET_ACCESS_KEY=minioadmin \
>     -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
>     -e  CONNECT_LISTENERS="http://localhost:808$1"; \
>     -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
>     -e  CONNECT_REST_PORT="808$1" \
>     -e  CONNECT_GROUP_ID="connect-cluster" \
>     -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
>     -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
>     -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
>     -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" 
> \
>     -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  
> CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
>  \
>     -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
>     --entrypoint bash \
>     confluentinc/cp-kafka-connect:7.6.1 \
>     -c "confluent-hub install --no-prompt 
> confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run"
> }
> cleanup_docker_env() {
>   docker volume prune -f
>   for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
> minio
>   do
>     dkill "$container"
>   done
> }
> launch_kafka() {
>   docker run --network host --hostname localhost --ulimit nofile=65536:65536 
> -d --name kafka -p 9092:9092 apache/kafka
>   for i in {1..2}
>   do
>     # Create a Kafka topic
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 
> --topic "test_topic$i"
>   done
>   for topic in connect-configs connect-offsets connect-status
>   do
>     # with cleanup.policy=compact, we can't have more than 1 partition
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
> $topic --config cleanup.policy=compact
>   done
> }
> cleanup_docker_env
> launch_kafka
> launch_minio
> launch_kafka_connect 1
> while true
> do
>   sleep 5
>   # Check if Kafka Connect is up
>   curl http://localhost:8081/ || continue
>   break
> done
> sleep 10
> for i in {1..2}
> do
> # Set up a connector
> curl -X POST -H "Content-Type: application/json" --data '{
>   "name": "s3-connector'"$i"'",
>   "config": {
>     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
>     "tasks.max": "12",
>     "topics": "test_topic'"$i"'",
>     "s3.region": "us-east-1",
>     "store.url": "http://0.0.0.0:9000";,
>     "s3.bucket.name": "my-minio-bucket",
>     "s3.part.size": "5242880",
>     "flush.size": "3",
>     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
>     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
>     "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
>     "schema.compatibility": "NONE"
>   }
> }' http://localhost:8081/connectors
> done
> launch_kafka_connect 2
> launch_kafka_connect 3
> {code}
>  
>  
> When the script ends, I have the first worker taking all the connectors/tasks:
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
> |grep worker_id | sort | uniq -c
>     12     "worker_id": "k1:8081"{code}
> {code:java}
> ❯ curl -s http://localhost:8

[jira] [Resolved] (KAFKA-16781) Expose advertised.listeners in controller node

2024-06-27 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-16781.
---
Fix Version/s: 3.9.0
   Resolution: Fixed

> Expose advertised.listeners in controller node
> --
>
> Key: KAFKA-16781
> URL: https://issues.apache.org/jira/browse/KAFKA-16781
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Luke Chen
>Assignee: Gantigmaa Selenge
>Priority: Major
>  Labels: need-kip, newbie, newbie++
> Fix For: 3.9.0
>
>
> After 
> [KIP-919|https://cwiki.apache.org/confluence/display/KAFKA/KIP-919%3A+Allow+AdminClient+to+Talk+Directly+with+the+KRaft+Controller+Quorum+and+add+Controller+Registration],
>  we allow clients to talk to the KRaft controller node directly. But unlike 
> broker node, we don't allow users to config advertised.listeners for clients 
> to connect to. Without this config, the client cannot connect to the 
> controller node if the controller is sitting behind NAT network while the 
> client is in the external network.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17025) KAFKA-17025: Producer throws uncaught exception in the io thread.

2024-06-27 Thread Hongshun Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongshun Wang updated KAFKA-17025:
--
Summary: KAFKA-17025: Producer throws uncaught exception in the io thread.  
(was: KafkaThread and KafkaProducer expose method to set 
setUncaughtExceptionHandler)

> KAFKA-17025: Producer throws uncaught exception in the io thread.
> -
>
> Key: KAFKA-17025
> URL: https://issues.apache.org/jira/browse/KAFKA-17025
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 3.6.2
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Major
>
> When I use KafkaProducer, OOM occurs but the KafkaProducer only log it but do 
> nothing:
>  
> {code:java}
> ERROR org.apache.kafka.common.utils.KafkaThread Uncaught exception in thread 
> 'kafka-producer-network-thread | producer-l': java.lang.Out0fMemoryError: 
> Direct buffer memory .
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:549) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) 
> at org.apache.kafka.clients.producer.internals.Sender.run 
> at java.Lang.Thread.run
> {code}
>  
>  
> I try to find what happens:
> 1. It seems that OutOfMemoryError as a Error is not captured when 
> org.apache.kafka.clients.producer.internals.Sender#run try to catch a 
> Exception: 
> {code:java}
> @Override
> public void run() {
> log.debug("Starting Kafka producer I/O thread.");
> // main loop, runs until close is called
> while (running) {
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> log.debug("Beginning shutdown of Kafka producer I/O thread, sending 
> remaining records.");
> // okay we stopped accepting requests but there may still be
> // requests in the transaction manager, accumulator or waiting for 
> acknowledgment,
> // wait until these are completed.
> while (!forceClose && ((this.accumulator.hasUndrained() || 
> this.client.inFlightRequestCount() > 0) || 
> hasPendingTransactionalRequests())) {
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> // Abort the transaction if any commit or abort didn't go through the 
> transaction manager's queue
> while (!forceClose && transactionManager != null && 
> transactionManager.hasOngoingTransaction()) {
> if (!transactionManager.isCompleting()) {
> log.info("Aborting incomplete transaction due to shutdown");
> transactionManager.beginAbort();
> }
> try {
> runOnce();
> } catch (Exception e) {
> log.error("Uncaught error in kafka producer I/O thread: ", e);
> }
> }
> if (forceClose) {
> // We need to fail all the incomplete transactional requests and 
> batches and wake up the threads waiting on
> // the futures.
> if (transactionManager != null) {
> log.debug("Aborting incomplete transactional requests due to 
> forced shutdown");
> transactionManager.close();
> }
> log.debug("Aborting incomplete batches due to forced shutdown");
> this.accumulator.abortIncompleteBatches();
> }
> try {
> this.client.close();
> } catch (Exception e) {
> log.error("Failed to close network client", e);
> }
> log.debug("Shutdown of Kafka producer I/O thread has completed.");
> }
> {code}
>  
> 2. Then KafkaThread catch uncaught exception and just log it:
> {code:java}
> public KafkaThread(final String name, Runnable runnable, boolean daemon) {
> super(runnable, name);
> configureThread(name, daemon);
> }
> private void configureThread(final String name, boolean daemon) {
> setDaemon(daemon);
> setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in 
> thread '{}':", name, e));
> }{code}
>  
> To be honest, I don't understand why KafkaThread doing nothing but log it 
> when an uncaught exception occurs? Why not exposing method to set 
> setUncaughtExceptionHandler in KafkaThread or KafkaProducer so that user can 
> determine what to do with uncaught exception, no matter thrown it or just 
> ignore it?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-17051) ApiKeys#toHtml should exclude the APIs having unstable latest version

2024-06-27 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-17051:
--

 Summary: ApiKeys#toHtml should exclude the APIs having unstable 
latest version
 Key: KAFKA-17051
 URL: https://issues.apache.org/jira/browse/KAFKA-17051
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


see the discussion: 
https://lists.apache.org/thread/wboozsrjbsv78wxyy9orhkmrdqghxc9o

The (released) docs should show only the APIs which are ready to be exposed 
publicly. Those APIs having make "latestVersionUnstable=true" should be 
excluded.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-17051) ApiKeys#toHtml should exclude the APIs having unstable latest version

2024-06-27 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-17051?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860710#comment-17860710
 ] 

黃竣陽 commented on KAFKA-17051:
-

I'm interesting in this issue, Could you assign to me?

> ApiKeys#toHtml should exclude the APIs having unstable latest version
> -
>
> Key: KAFKA-17051
> URL: https://issues.apache.org/jira/browse/KAFKA-17051
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> see the discussion: 
> https://lists.apache.org/thread/wboozsrjbsv78wxyy9orhkmrdqghxc9o
> The (released) docs should show only the APIs which are ready to be exposed 
> publicly. Those APIs having make "latestVersionUnstable=true" should be 
> excluded.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-17051) ApiKeys#toHtml should exclude the APIs having unstable latest version

2024-06-27 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-17051:
--

Assignee: 黃竣陽  (was: Chia-Ping Tsai)

> ApiKeys#toHtml should exclude the APIs having unstable latest version
> -
>
> Key: KAFKA-17051
> URL: https://issues.apache.org/jira/browse/KAFKA-17051
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: 黃竣陽
>Priority: Minor
>
> see the discussion: 
> https://lists.apache.org/thread/wboozsrjbsv78wxyy9orhkmrdqghxc9o
> The (released) docs should show only the APIs which are ready to be exposed 
> publicly. Those APIs having make "latestVersionUnstable=true" should be 
> excluded.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-17022) Fix error-prone in KafkaApis#handleFetchRequest

2024-06-27 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17022?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-17022.

Fix Version/s: 3.9.0
   Resolution: Fixed

> Fix error-prone in KafkaApis#handleFetchRequest 
> 
>
> Key: KAFKA-17022
> URL: https://issues.apache.org/jira/browse/KAFKA-17022
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: TengYao Chi
>Priority: Minor
> Fix For: 3.9.0
>
>
>  `createResponse`[0] references a variable out of scope, and so that is 
> error-prone since it could be not initialized when executing. We should do a 
> bit refactor to add `unconvertedFetchResponse` to `createResponse.
> [0] 
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L939



--
This message was sent by Atlassian Jira
(v8.20.10#820010)