[jira] [Reopened] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2021-08-04 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo reopened KAFKA-10413:


Hi,

I'm reopening this because I'm still seeing this issue with CP 6.2.0 which 
ships with 2.8.0 which is marked as a fixed version.
We basically see the same behavior as mentioned in the issue description.

> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Assignee: rameshkrishnan muthusamy
>Priority: Major
> Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
> Attachments: connect_worker_balanced.png
>
>
> Hi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2021-08-04 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-10413:
---
Description: 
GHi,

With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, if 
a connect instance disappear, or a new one appear, we're seeing unbalanced 
consumption, much like mentionned in this post:

[https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]


This usually leads to one kafka connect instance taking most of the load and 
consumption not being able to keep on.
Currently, we're "fixing" this by deleting the connector and re-creating it, 
but this is far from ideal.

Any suggestion on what we could do to mitigate this ?

  was:
Hi,

With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, if 
a connect instance disappear, or a new one appear, we're seeing unbalanced 
consumption, much like mentionned in this post:

[https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]


This usually leads to one kafka connect instance taking most of the load and 
consumption not being able to keep on.
Currently, we're "fixing" this by deleting the connector and re-creating it, 
but this is far from ideal.

Any suggestion on what we could do to mitigate this ?


> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Assignee: rameshkrishnan muthusamy
>Priority: Major
> Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
> Attachments: connect_worker_balanced.png
>
>
> GHi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2021-12-20 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17462669#comment-17462669
 ] 

yazgoo commented on KAFKA-12495:


Hi, any news on this issue ?
I have been testing and it looks like it solves my unbalance issue with 
autoscaling.


Is there a plan to merge it ?

Thanks

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.2.0
>
> Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> but we didn't revoke any more C/T in this round, which cause unbalanced 
> distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5], revoked: [])
> {code}
> Because we didn't allow to do consecutive revoke in two consecutive 
> rebalances (under the same leader), we will have this uneven distribution 
> under this situation. We should allow consecutive rebalance to have another 
> round of revocation to revoke the C/T to the other members in this case.
> expected:
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> **and also revoke some C/T** 
> W1(delay: 0, assigned: [AC0, AT1, AT2], revoked: [AT3])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W3(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W4(delay: 0, assigned: [BT4, BT5], revoked: [])
> // another round of rebalance to assign the new revoked C/T to the other 
> members
> W1 rejoins with assignment: [

[jira] [Commented] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2022-01-18 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17477980#comment-17477980
 ] 

yazgoo commented on KAFKA-10413:


Hi, any news on this issue ?
I'm suspecting it is linked with 
https://issues.apache.org/jira/browse/KAFKA-12495
The current way we found to mitigate this is to destroy / re-create the 
connectors.

> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Assignee: rameshkrishnan muthusamy
>Priority: Major
> Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
> Attachments: connect_worker_balanced.png
>
>
> GHi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2022-01-18 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17477980#comment-17477980
 ] 

yazgoo edited comment on KAFKA-10413 at 1/18/22, 3:35 PM:
--

Hi, any news on this issue ?
I'm suspecting it is linked with 
https://issues.apache.org/jira/browse/KAFKA-12495 (I tried the attached PR and 
did not see the unbalance occur).
The current way we found to mitigate this is to destroy / re-create the 
connectors.


was (Author: yazgoo):
Hi, any news on this issue ?
I'm suspecting it is linked with 
https://issues.apache.org/jira/browse/KAFKA-12495
The current way we found to mitigate this is to destroy / re-create the 
connectors.

> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Assignee: rameshkrishnan muthusamy
>Priority: Major
> Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
> Attachments: connect_worker_balanced.png
>
>
> GHi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-25 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17859918#comment-17859918
 ] 

yazgoo commented on KAFKA-10413:


Hi [~sagarrao] , it did not fix it, I still have the issue with CP 7.6

> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Assignee: rameshkrishnan muthusamy
>Priority: Major
> Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
> Attachments: connect_worker_balanced.png
>
>
> GHi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-10413:
---
Attachment: rebalance.sh

> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Assignee: rameshkrishnan muthusamy
>Priority: Major
> Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
> Attachments: connect_worker_balanced.png, rebalance.sh
>
>
> GHi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860408#comment-17860408
 ] 

yazgoo commented on KAFKA-10413:


Hello, I launch the attached script :

[^rebalance.sh]

And in one of my test I get onne connect well balanced
{code:java}
 ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     15     "worker_id": "k2:8082"
     15     "worker_id": "k3:8083"
     15     "worker_id": "k4:8084"
     15     "worker_id": "k5:8085"
     15     "worker_id": "k6:8086"
     15     "worker_id": "k7:8087"
     15     "worker_id": "k8:8088"
     15     "worker_id": "k9:8089"
{code}
 


And the other one unbalanced

 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     27     "worker_id": "k1:8081"
     11     "worker_id": "k2:8082"
     12     "worker_id": "k3:8083"
     11     "worker_id": "k4:8084"
     12     "worker_id": "k5:8085"
     12     "worker_id": "k6:8086"
     11     "worker_id": "k7:8087"
     12     "worker_id": "k8:8088"
     12     "worker_id": "k9:8089"
{code}
 

Regards

> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Assignee: rameshkrishnan muthusamy
>Priority: Major
> Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
> Attachments: connect_worker_balanced.png, rebalance.sh
>
>
> GHi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860408#comment-17860408
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 1:38 PM:
-

Hello, I launch the attached script :

[^rebalance.sh]

And in my test after waiting for a few minutes, I get:

one connect well balanced
{code:java}
 ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     15     "worker_id": "k2:8082"
     15     "worker_id": "k3:8083"
     15     "worker_id": "k4:8084"
     15     "worker_id": "k5:8085"
     15     "worker_id": "k6:8086"
     15     "worker_id": "k7:8087"
     15     "worker_id": "k8:8088"
     15     "worker_id": "k9:8089"
{code}
 

And the other one unbalanced

 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     27     "worker_id": "k1:8081"
     11     "worker_id": "k2:8082"
     12     "worker_id": "k3:8083"
     11     "worker_id": "k4:8084"
     12     "worker_id": "k5:8085"
     12     "worker_id": "k6:8086"
     11     "worker_id": "k7:8087"
     12     "worker_id": "k8:8088"
     12     "worker_id": "k9:8089"
{code}
 

Regards


was (Author: yazgoo):
Hello, I launch the attached script :

[^rebalance.sh]

And in one of my test I get onne connect well balanced
{code:java}
 ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     15     "worker_id": "k2:8082"
     15     "worker_id": "k3:8083"
     15     "worker_id": "k4:8084"
     15     "worker_id": "k5:8085"
     15     "worker_id": "k6:8086"
     15     "worker_id": "k7:8087"
     15     "worker_id": "k8:8088"
     15     "worker_id": "k9:8089"
{code}
 


And the other one unbalanced

 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     27     "worker_id": "k1:8081"
     11     "worker_id": "k2:8082"
     12     "worker_id": "k3:8083"
     11     "worker_id": "k4:8084"
     12     "worker_id": "k5:8085"
     12     "worker_id": "k6:8086"
     11     "worker_id": "k7:8087"
     12     "worker_id": "k8:8088"
     12     "worker_id": "k9:8089"
{code}
 

Regards

> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Assignee: rameshkrishnan muthusamy
>Priority: Major
> Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
> Attachments: connect_worker_balanced.png, rebalance.sh
>
>
> GHi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo commented on KAFKA-10413:


Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:


{code:java}
#!/bin/bash
set -xedkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}
launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}cleanup_docker_envlaunch_kafka
launch_minio
launch_kafka_connect 1while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
donesleep 10for i in {1..2}
do# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectorsdonelaunch_kafka_connect 2
launch_kafka_connect 3
launch_kafka_connect 4
launch_kafka_connect 5
{code}

When the scrit ends, I have two each connector one worker with connector #1 
tasks, the other one with connector #2 tasks.

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     30     "worker_id": "k2:8082"
     30     "worker_id": "k3:8083"
     30     "worker_id": "k4:8084"
     30     "worker_id": "k5:8085" {code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     48     "worker_id": "k1:8081"
     18     "worker_id": "k2:8082"
     18     "worker_id": "k3:8083"
     18     "worker_id": "k4:8084"
     18     "worker_id": "k5:8085" {code}
 

> rebalancing leads

[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 2:39 PM:
-

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:
{code:java}
#!/bin/bash
set -xedkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}
launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_envlaunch_kafka
launch_minio
launch_kafka_connect 1while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
donesleep 10for i in {1..2}
do# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectorsdonelaunch_kafka_connect 2
launch_kafka_connect 3
launch_kafka_connect 4
launch_kafka_connect 5
{code}
When the scrit ends, I have two each connector one worker with connector #1 
tasks, the other one with connector #2 tasks.

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     30     "worker_id": "k2:8082"
     30     "worker_id": "k3:8083"
     30     "worker_id": "k4:8084"
     30     "worker_id": "k5:8085" {code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     48     "worker_id": "k1:8081"
     18     "worker_id": "k2:8082"
     18     "worker_id": "k3:8083"
     18     "worker_id": "k4:8084"
     18     "w

[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 2:40 PM:
-

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:




 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
launch_kafka_connect 4
launch_kafka_connect 5
{code}
 

 

When the scrit ends, I have two each connector one worker with connector #1 
tasks, the other one with connector #2 tasks.

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     30     "worker_id": "k2:8082"
     30     "worker_id": "k3:8083"
     30     "worker_id": "k4:8084"
     30     "worker_id": "k5:8085" {code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     48     "worker_id": "k1:8081"
     18     "worker_id": "k2:8082"
     18     "worker_id": "k3:8083"
     18     "worker_id": "k

[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 2:47 PM:
-

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:
{code:java}
 {code}
 

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the scrit ends, I have two each connector one worker with connector #1 
tasks, the other one with connector #2 tasks.

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     80     "worker_id": "k1:8081"
     20     "worker_id": "k2:8082"
     20     "worker_id": "k3:8083"
{code}
 

In the end, we indeed get 80 tasks on each workers, but for distribution 
reasons , I think it should be (40, 40, 4

[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 2:47 PM:
-

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the scrit ends, I have two each connector one worker with connector #1 
tasks, the other one with connector #2 tasks.

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     80     "worker_id": "k1:8081"
     20     "worker_id": "k2:8082"
     20     "worker_id": "k3:8083"
{code}
 

In the end, we indeed get 80 tasks on each workers, but for distribution 
reasons , I think it should be (40, 40, 40) for each connector

[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 2:52 PM:
-

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the scrit ends, I have two each connector one worker with connector #1 
tasks, the other one with connector #2 tasks.


{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081

[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 2:54 PM:
-

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-conne

[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 2:54 PM:
-

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-conne

[jira] [Comment Edited] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 3:08 PM:
-

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-conne

[jira] [Created] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)
yazgoo created KAFKA-17049:
--

 Summary: unbalanced connectors
 Key: KAFKA-17049
 URL: https://issues.apache.org/jira/browse/KAFKA-17049
 Project: Kafka
  Issue Type: Bug
Reporter: yazgoo


This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script (create two connectors on an existing 
worker, then add two workers) :


{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:jav

[jira] [Commented] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860562#comment-17860562
 ] 

yazgoo commented on KAFKA-10413:


I created https://issues.apache.org/jira/browse/KAFKA-17049

> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Assignee: rameshkrishnan muthusamy
>Priority: Major
> Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
> Attachments: connect_worker_balanced.png, rebalance.sh
>
>
> GHi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Component/s: connect

> unbalanced connectors
> -
>
> Key: KAFKA-17049
> URL: https://issues.apache.org/jira/browse/KAFKA-17049
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yazgoo
>Priority: Major
>
> This follows https://issues.apache.org/jira/browse/KAFKA-10413
> When runnning the following script (create two connectors on an existing 
> worker, then add two workers) :
> {code:java}
> #!/bin/bash
> set -xe
> dkill() {
>   docker stop "$1" || true
>   docker rm -v -f "$1" || true
> }
> write_topic() {
>   # write 200 messages to the topic
>   json='{"name": "test"}'
>   docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
> /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
> --topic test_topic$1"
> }
> launch_minio() {
>   # Launch Minio (Fake S3)
>   docker run --network host -d --name minio \
>     -e MINIO_ROOT_USER=minioadmin \
>     -e MINIO_ROOT_PASSWORD=minioadmin \
>     minio/minio server --console-address :9001 /data
>       docker exec -it minio mkdir /data/my-minio-bucket
> }
> launch_kafka_connect() {
>   # Start Kafka Connect with S3 Connector
>   docker run --network host -d --name "kafka-connect$1" \
>     -e  AWS_ACCESS_KEY_ID=minioadmin \
>     -e  AWS_SECRET_ACCESS_KEY=minioadmin \
>     -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
>     -e  CONNECT_LISTENERS="http://localhost:808$1"; \
>     -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
>     -e  CONNECT_REST_PORT="808$1" \
>     -e  CONNECT_GROUP_ID="connect-cluster" \
>     -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
>     -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
>     -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
>     -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" 
> \
>     -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  
> CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
>  \
>     -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
>     --entrypoint bash \
>     confluentinc/cp-kafka-connect:7.6.1 \
>     -c "confluent-hub install --no-prompt 
> confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run"
> }
> cleanup_docker_env() {
>   docker volume prune -f
>   for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
> minio
>   do
>     dkill "$container"
>   done
> }
> launch_kafka() {
>   docker run --network host --hostname localhost --ulimit nofile=65536:65536 
> -d --name kafka -p 9092:9092 apache/kafka
>   for i in {1..2}
>   do
>     # Create a Kafka topic
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 
> --topic "test_topic$i"
>     write_topic "$i"
>   done
>   for topic in connect-configs connect-offsets connect-status
>   do
>     # with cleanup.policy=compact, we can't have more than 1 partition
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
> $topic --config cleanup.policy=compact
>   done
> }
> cleanup_docker_env
> launch_kafka
> launch_minio
> launch_kafka_connect 1
> while true
> do
>   sleep 5
>   # Check if Kafka Connect is up
>   curl http://localhost:8081/ || continue
>   break
> done
> sleep 10
> for i in {1..2}
> do
> # Set up a connector
> curl -X POST -H "Content-Type: application/json" --data '{
>   "name": "s3-connector'"$i"'",
>   "config": {
>     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
>     "tasks.max": "120",
>     "topics": "test_topic'"$i"'",
>     "s3.region": "us-east-1",
>     "store.url": "http://0.0.0.0:9000";,
>     "s3.bucket.name": "my-minio-bucket",
>     "s3.part.size": "5242880",
>     "flush.size": "3",
>     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
>     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
>     "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
>     "schema.compatibility": "NONE"
>   }
> }' http://localhost:8081/connectors
> done
> launch_kafka_connect 2
> launch_kafka_connect 3
> {code}
>  
>  
> When the script ends, I have one worker with connector #1 tasks, the other 
> one with connector #2 tasks.
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
> |grep worker_id | sort | uniq

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. add two more workers


{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|gre

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. add two more workers
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep 

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    120     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers



 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 --topic 
"test_topic$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "12",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     6     "worker_id": "k2:8082"
     6     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     8     "worker_id": "k1:8081"
     2     "worker_id": "k2:8082"
     2     "worker_id": "k3:8083"
{code}
 

In the end, we indeed get 8 tasks on each workers, but for distribution reasons 
, I think it should be (4, 4, 4) for each connector, bec

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers



 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 --topic 
"test_topic$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "12",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker with connector #1 tasks, the other one 
with connector #2 tasks.
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     6     "worker_id": "k2:8082"
     6     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     8     "worker_id": "k1:8081"
     2     "worker_id": "k2:8082"
     2     "worker_id": "k3:8083"
{code}
 

In the end, we indeed get 8 tasks on each workers, but for distribution reasons 
, I think it should be (4, 4, 4) for each connector, bec

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 --topic 
"test_topic$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "12",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have one worker taking all the connectors/tasks:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     6     "worker_id": "k2:8082"
     6     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     8     "worker_id": "k1:8081"
     2     "worker_id": "k2:8082"
     2     "worker_id": "k3:8083"
{code}
 

In the end, we indeed get 8 tasks on each workers, but for distribution reasons 
, I think it should be (4, 4, 4) for each connector, because all connectors 
don't do the 

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 --topic 
"test_topic$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "12",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have the first worker taking all the connectors/tasks:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"
{code}
 

Then I wait a few minutes,

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     6     "worker_id": "k2:8082"
     6     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     8     "worker_id": "k1:8081"
     2     "worker_id": "k2:8082"
     2     "worker_id": "k3:8083"
{code}
 

In the end, we indeed get 8 tasks on each workers, but for distribution reasons 
, I think it should be (4, 4, 4) for each connector, because all connectors 
do

[jira] [Updated] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-17049:
---
Description: 
This follows https://issues.apache.org/jira/browse/KAFKA-10413

When runnning the following script, which

1. runs one worker
2. declares two connectors
3. adds two more workers

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 --topic 
"test_topic$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "12",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the script ends, I have the first worker taking all the connectors/tasks:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"{code}
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
    12     "worker_id": "k1:8081"
{code}
 

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     6     "worker_id": "k2:8082"
     6     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     8     "worker_id": "k1:8081"
     2     "worker_id": "k2:8082"
     2     "worker_id": "k3:8083"
{code}
 

In the end, we indeed get 8 tasks on each workers, but for distribution reasons 
, I think it should be (4, 4, 4) for each connector, because all connectors 
don't d

[jira] [Commented] (KAFKA-17049) unbalanced connectors

2024-06-27 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860647#comment-17860647
 ] 

yazgoo commented on KAFKA-17049:


Thanks [~gharris1727] ,

I'll try and have a look at it !

> unbalanced connectors
> -
>
> Key: KAFKA-17049
> URL: https://issues.apache.org/jira/browse/KAFKA-17049
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yazgoo
>Priority: Major
>
> This follows https://issues.apache.org/jira/browse/KAFKA-10413
> When runnning the following script, which
> 1. runs one worker
> 2. declares two connectors
> 3. adds two more workers
>  
> {code:java}
> #!/bin/bash
> set -xe
> dkill() {
>   docker stop "$1" || true
>   docker rm -v -f "$1" || true
> }
> launch_minio() {
>   # Launch Minio (Fake S3)
>   docker run --network host -d --name minio \
>     -e MINIO_ROOT_USER=minioadmin \
>     -e MINIO_ROOT_PASSWORD=minioadmin \
>     minio/minio server --console-address :9001 /data
>       docker exec -it minio mkdir /data/my-minio-bucket
> }
> launch_kafka_connect() {
>   # Start Kafka Connect with S3 Connector
>   docker run --network host -d --name "kafka-connect$1" \
>     -e  AWS_ACCESS_KEY_ID=minioadmin \
>     -e  AWS_SECRET_ACCESS_KEY=minioadmin \
>     -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
>     -e  CONNECT_LISTENERS="http://localhost:808$1"; \
>     -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
>     -e  CONNECT_REST_PORT="808$1" \
>     -e  CONNECT_GROUP_ID="connect-cluster" \
>     -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
>     -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
>     -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
>     -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" 
> \
>     -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  
> CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
>  \
>     -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
>     --entrypoint bash \
>     confluentinc/cp-kafka-connect:7.6.1 \
>     -c "confluent-hub install --no-prompt 
> confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run"
> }
> cleanup_docker_env() {
>   docker volume prune -f
>   for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
> minio
>   do
>     dkill "$container"
>   done
> }
> launch_kafka() {
>   docker run --network host --hostname localhost --ulimit nofile=65536:65536 
> -d --name kafka -p 9092:9092 apache/kafka
>   for i in {1..2}
>   do
>     # Create a Kafka topic
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 
> --topic "test_topic$i"
>   done
>   for topic in connect-configs connect-offsets connect-status
>   do
>     # with cleanup.policy=compact, we can't have more than 1 partition
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
> $topic --config cleanup.policy=compact
>   done
> }
> cleanup_docker_env
> launch_kafka
> launch_minio
> launch_kafka_connect 1
> while true
> do
>   sleep 5
>   # Check if Kafka Connect is up
>   curl http://localhost:8081/ || continue
>   break
> done
> sleep 10
> for i in {1..2}
> do
> # Set up a connector
> curl -X POST -H "Content-Type: application/json" --data '{
>   "name": "s3-connector'"$i"'",
>   "config": {
>     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
>     "tasks.max": "12",
>     "topics": "test_topic'"$i"'",
>     "s3.region": "us-east-1",
>     "store.url": "http://0.0.0.0:9000";,
>     "s3.bucket.name": "my-minio-bucket",
>     "s3.part.size": "5242880",
>     "flush.size": "3",
>     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
>     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
>     "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
>     "schema.compatibility": "NONE"
>   }
> }' http://localhost:8081/connectors
> done
> launch_kafka_connect 2
> launch_kafka_connect 3
> {code}
>  
>  
> When the script ends, I have the first worker taking all the connectors/tasks:
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
> |grep worker_id | sort | uniq -c
>     12     "worker_id": "k1:8081"{code}
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
> |grep worker_id | sort | uniq -c
>     12     "worker_id": "k1:8081"
> {co

[jira] [Commented] (KAFKA-17049) Incremental rebalances assign too many tasks for the same connector together

2024-06-28 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860777#comment-17860777
 ] 

yazgoo commented on KAFKA-17049:


I have a first working solution, I'll clean it up and open a PR 
https://github.com/apache/kafka/compare/trunk...yazgoo:kafka:trunk

> Incremental rebalances assign too many tasks for the same connector together
> 
>
> Key: KAFKA-17049
> URL: https://issues.apache.org/jira/browse/KAFKA-17049
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yazgoo
>Priority: Major
>
> This follows https://issues.apache.org/jira/browse/KAFKA-10413
> When runnning the following script, which
> 1. runs one worker
> 2. declares two connectors
> 3. adds two more workers
>  
> {code:java}
> #!/bin/bash
> set -xe
> dkill() {
>   docker stop "$1" || true
>   docker rm -v -f "$1" || true
> }
> launch_minio() {
>   # Launch Minio (Fake S3)
>   docker run --network host -d --name minio \
>     -e MINIO_ROOT_USER=minioadmin \
>     -e MINIO_ROOT_PASSWORD=minioadmin \
>     minio/minio server --console-address :9001 /data
>       docker exec -it minio mkdir /data/my-minio-bucket
> }
> launch_kafka_connect() {
>   # Start Kafka Connect with S3 Connector
>   docker run --network host -d --name "kafka-connect$1" \
>     -e  AWS_ACCESS_KEY_ID=minioadmin \
>     -e  AWS_SECRET_ACCESS_KEY=minioadmin \
>     -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
>     -e  CONNECT_LISTENERS="http://localhost:808$1"; \
>     -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
>     -e  CONNECT_REST_PORT="808$1" \
>     -e  CONNECT_GROUP_ID="connect-cluster" \
>     -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
>     -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
>     -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
>     -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" 
> \
>     -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  
> CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
>  \
>     -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
>     --entrypoint bash \
>     confluentinc/cp-kafka-connect:7.6.1 \
>     -c "confluent-hub install --no-prompt 
> confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run"
> }
> cleanup_docker_env() {
>   docker volume prune -f
>   for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
> minio
>   do
>     dkill "$container"
>   done
> }
> launch_kafka() {
>   docker run --network host --hostname localhost --ulimit nofile=65536:65536 
> -d --name kafka -p 9092:9092 apache/kafka
>   for i in {1..2}
>   do
>     # Create a Kafka topic
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 
> --topic "test_topic$i"
>   done
>   for topic in connect-configs connect-offsets connect-status
>   do
>     # with cleanup.policy=compact, we can't have more than 1 partition
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
> $topic --config cleanup.policy=compact
>   done
> }
> cleanup_docker_env
> launch_kafka
> launch_minio
> launch_kafka_connect 1
> while true
> do
>   sleep 5
>   # Check if Kafka Connect is up
>   curl http://localhost:8081/ || continue
>   break
> done
> sleep 10
> for i in {1..2}
> do
> # Set up a connector
> curl -X POST -H "Content-Type: application/json" --data '{
>   "name": "s3-connector'"$i"'",
>   "config": {
>     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
>     "tasks.max": "12",
>     "topics": "test_topic'"$i"'",
>     "s3.region": "us-east-1",
>     "store.url": "http://0.0.0.0:9000";,
>     "s3.bucket.name": "my-minio-bucket",
>     "s3.part.size": "5242880",
>     "flush.size": "3",
>     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
>     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
>     "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
>     "schema.compatibility": "NONE"
>   }
> }' http://localhost:8081/connectors
> done
> launch_kafka_connect 2
> launch_kafka_connect 3
> {code}
>  
>  
> When the script ends, I have the first worker taking all the connectors/tasks:
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
> |grep worker_id | sort | uniq -c
>     12     "worker_id": "

[jira] [Commented] (KAFKA-17049) Incremental rebalances assign too many tasks for the same connector together

2024-06-28 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860793#comment-17860793
 ] 

yazgoo commented on KAFKA-17049:


I opened https://github.com/apache/kafka/pull/16486

> Incremental rebalances assign too many tasks for the same connector together
> 
>
> Key: KAFKA-17049
> URL: https://issues.apache.org/jira/browse/KAFKA-17049
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yazgoo
>Priority: Major
>
> This follows https://issues.apache.org/jira/browse/KAFKA-10413
> When runnning the following script, which
> 1. runs one worker
> 2. declares two connectors
> 3. adds two more workers
>  
> {code:java}
> #!/bin/bash
> set -xe
> dkill() {
>   docker stop "$1" || true
>   docker rm -v -f "$1" || true
> }
> launch_minio() {
>   # Launch Minio (Fake S3)
>   docker run --network host -d --name minio \
>     -e MINIO_ROOT_USER=minioadmin \
>     -e MINIO_ROOT_PASSWORD=minioadmin \
>     minio/minio server --console-address :9001 /data
>       docker exec -it minio mkdir /data/my-minio-bucket
> }
> launch_kafka_connect() {
>   # Start Kafka Connect with S3 Connector
>   docker run --network host -d --name "kafka-connect$1" \
>     -e  AWS_ACCESS_KEY_ID=minioadmin \
>     -e  AWS_SECRET_ACCESS_KEY=minioadmin \
>     -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
>     -e  CONNECT_LISTENERS="http://localhost:808$1"; \
>     -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
>     -e  CONNECT_REST_PORT="808$1" \
>     -e  CONNECT_GROUP_ID="connect-cluster" \
>     -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
>     -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
>     -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
>     -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" 
> \
>     -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  
> CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
>  \
>     -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
>     --entrypoint bash \
>     confluentinc/cp-kafka-connect:7.6.1 \
>     -c "confluent-hub install --no-prompt 
> confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run"
> }
> cleanup_docker_env() {
>   docker volume prune -f
>   for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
> minio
>   do
>     dkill "$container"
>   done
> }
> launch_kafka() {
>   docker run --network host --hostname localhost --ulimit nofile=65536:65536 
> -d --name kafka -p 9092:9092 apache/kafka
>   for i in {1..2}
>   do
>     # Create a Kafka topic
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 
> --topic "test_topic$i"
>   done
>   for topic in connect-configs connect-offsets connect-status
>   do
>     # with cleanup.policy=compact, we can't have more than 1 partition
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
> $topic --config cleanup.policy=compact
>   done
> }
> cleanup_docker_env
> launch_kafka
> launch_minio
> launch_kafka_connect 1
> while true
> do
>   sleep 5
>   # Check if Kafka Connect is up
>   curl http://localhost:8081/ || continue
>   break
> done
> sleep 10
> for i in {1..2}
> do
> # Set up a connector
> curl -X POST -H "Content-Type: application/json" --data '{
>   "name": "s3-connector'"$i"'",
>   "config": {
>     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
>     "tasks.max": "12",
>     "topics": "test_topic'"$i"'",
>     "s3.region": "us-east-1",
>     "store.url": "http://0.0.0.0:9000";,
>     "s3.bucket.name": "my-minio-bucket",
>     "s3.part.size": "5242880",
>     "flush.size": "3",
>     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
>     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
>     "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
>     "schema.compatibility": "NONE"
>   }
> }' http://localhost:8081/connectors
> done
> launch_kafka_connect 2
> launch_kafka_connect 3
> {code}
>  
>  
> When the script ends, I have the first worker taking all the connectors/tasks:
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
> |grep worker_id | sort | uniq -c
>     12     "worker_id": "k1:8081"{code}
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-con

[jira] (KAFKA-17049) Incremental rebalances assign too many tasks for the same connector together

2024-06-28 Thread yazgoo (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-17049 ]


yazgoo deleted comment on KAFKA-17049:


was (Author: yazgoo):
I have a first working solution, I'll clean it up and open a PR 
https://github.com/apache/kafka/compare/trunk...yazgoo:kafka:trunk

> Incremental rebalances assign too many tasks for the same connector together
> 
>
> Key: KAFKA-17049
> URL: https://issues.apache.org/jira/browse/KAFKA-17049
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: yazgoo
>Priority: Major
>
> This follows https://issues.apache.org/jira/browse/KAFKA-10413
> When runnning the following script, which
> 1. runs one worker
> 2. declares two connectors
> 3. adds two more workers
>  
> {code:java}
> #!/bin/bash
> set -xe
> dkill() {
>   docker stop "$1" || true
>   docker rm -v -f "$1" || true
> }
> launch_minio() {
>   # Launch Minio (Fake S3)
>   docker run --network host -d --name minio \
>     -e MINIO_ROOT_USER=minioadmin \
>     -e MINIO_ROOT_PASSWORD=minioadmin \
>     minio/minio server --console-address :9001 /data
>       docker exec -it minio mkdir /data/my-minio-bucket
> }
> launch_kafka_connect() {
>   # Start Kafka Connect with S3 Connector
>   docker run --network host -d --name "kafka-connect$1" \
>     -e  AWS_ACCESS_KEY_ID=minioadmin \
>     -e  AWS_SECRET_ACCESS_KEY=minioadmin \
>     -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
>     -e  CONNECT_LISTENERS="http://localhost:808$1"; \
>     -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
>     -e  CONNECT_REST_PORT="808$1" \
>     -e  CONNECT_GROUP_ID="connect-cluster" \
>     -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
>     -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
>     -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
>     -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" 
> \
>     -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  
> CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
>  \
>     -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
>     --entrypoint bash \
>     confluentinc/cp-kafka-connect:7.6.1 \
>     -c "confluent-hub install --no-prompt 
> confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run"
> }
> cleanup_docker_env() {
>   docker volume prune -f
>   for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
> minio
>   do
>     dkill "$container"
>   done
> }
> launch_kafka() {
>   docker run --network host --hostname localhost --ulimit nofile=65536:65536 
> -d --name kafka -p 9092:9092 apache/kafka
>   for i in {1..2}
>   do
>     # Create a Kafka topic
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 
> --topic "test_topic$i"
>   done
>   for topic in connect-configs connect-offsets connect-status
>   do
>     # with cleanup.policy=compact, we can't have more than 1 partition
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
> $topic --config cleanup.policy=compact
>   done
> }
> cleanup_docker_env
> launch_kafka
> launch_minio
> launch_kafka_connect 1
> while true
> do
>   sleep 5
>   # Check if Kafka Connect is up
>   curl http://localhost:8081/ || continue
>   break
> done
> sleep 10
> for i in {1..2}
> do
> # Set up a connector
> curl -X POST -H "Content-Type: application/json" --data '{
>   "name": "s3-connector'"$i"'",
>   "config": {
>     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
>     "tasks.max": "12",
>     "topics": "test_topic'"$i"'",
>     "s3.region": "us-east-1",
>     "store.url": "http://0.0.0.0:9000";,
>     "s3.bucket.name": "my-minio-bucket",
>     "s3.part.size": "5242880",
>     "flush.size": "3",
>     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
>     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
>     "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
>     "schema.compatibility": "NONE"
>   }
> }' http://localhost:8081/connectors
> done
> launch_kafka_connect 2
> launch_kafka_connect 3
> {code}
>  
>  
> When the script ends, I have the first worker taking all the connectors/tasks:
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
> |grep worker_id | sort | uniq -c
>     12     "worker_id": "k1:8081"{code}
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-co

[jira] [Created] (KAFKA-10323) NullPointerException during rebalance

2020-07-29 Thread yazgoo (Jira)
yazgoo created KAFKA-10323:
--

 Summary: NullPointerException during rebalance
 Key: KAFKA-10323
 URL: https://issues.apache.org/jira/browse/KAFKA-10323
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.5.0
Reporter: yazgoo


*confluent platform version: 5.5.0-ccs*

connector used: s3

Connector stops after rebalancing:

ERROR [Worker clientId=connect-1, groupId=connect] Couldn't instantiate task 
 because it has an invalid task configuration. This task will not 
execute until reconfigured.

java.lang.NullPointerException
 at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:427)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1147)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:126)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1162)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1158)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted

2020-08-14 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177821#comment-17177821
 ] 

yazgoo commented on KAFKA-5896:
---

I still encountered this exception in CP 2.5, shouldn't this be re-opened ? 

> Kafka Connect task threads never interrupted
> 
>
> Key: KAFKA-5896
> URL: https://issues.apache.org/jira/browse/KAFKA-5896
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Nick Pillitteri
>Assignee: Nick Pillitteri
>Priority: Minor
>
> h2. Problem
> Kafka Connect tasks associated with connectors are run in their own threads. 
> When tasks are stopped or restarted, a flag is set - {{stopping}} - to 
> indicate the task should stop processing records. However, if the thread the 
> task is running in is blocked (waiting for a lock or performing I/O) it's 
> possible the task will never stop.
> I've created a connector specifically to demonstrate this issue (along with 
> some more detailed instructions for reproducing the issue): 
> https://github.com/smarter-travel-media/hang-connector
> I believe this is an issue because it means that a single badly behaved 
> connector (any connector that does I/O without timeouts) can cause the Kafka 
> Connect worker to get into a state where the only solution is to restart the 
> JVM.
> I think, but couldn't reproduce, that this is the cause of this problem on 
> Stack Overflow: 
> https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work
> h2. Expected Result
> I would expect the Worker to eventually interrupt the thread that the task is 
> running in. In the past across various other libraries, this is what I've 
> seen done when a thread needs to be forcibly stopped.
> h2. Actual Result
> In actuality, the Worker sets a {{stopping}} flag and lets the thread run 
> indefinitely. It uses a timeout while waiting for the task to stop but after 
> this timeout has expired it simply sets a {{cancelled}} flag. This means that 
> every time a task is restarted, a new thread running the task will be 
> created. Thus a task may end up with multiple instances all running in their 
> own threads when there's only supposed to be a single thread.
> h2. Steps to Reproduce
> The problem can be replicated by using the connector available here: 
> https://github.com/smarter-travel-media/hang-connector
> Apologies for how involved the steps are.
> I've created a patch that forcibly interrupts threads after they fail to 
> gracefully shutdown here: 
> https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5
> I've confirmed that this fixes the issue. I can add some unit tests and 
> submit a PR if people agree that this is a bug and interrupting threads is 
> the right fix.
> Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-5896) Kafka Connect task threads never interrupted

2020-08-14 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17177821#comment-17177821
 ] 

yazgoo edited comment on KAFKA-5896 at 8/14/20, 3:11 PM:
-

I still encountered this exception in CP 5.5, shouldn't this be re-opened ? 


was (Author: yazgoo):
I still encountered this exception in CP 2.5, shouldn't this be re-opened ? 

> Kafka Connect task threads never interrupted
> 
>
> Key: KAFKA-5896
> URL: https://issues.apache.org/jira/browse/KAFKA-5896
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Nick Pillitteri
>Assignee: Nick Pillitteri
>Priority: Minor
>
> h2. Problem
> Kafka Connect tasks associated with connectors are run in their own threads. 
> When tasks are stopped or restarted, a flag is set - {{stopping}} - to 
> indicate the task should stop processing records. However, if the thread the 
> task is running in is blocked (waiting for a lock or performing I/O) it's 
> possible the task will never stop.
> I've created a connector specifically to demonstrate this issue (along with 
> some more detailed instructions for reproducing the issue): 
> https://github.com/smarter-travel-media/hang-connector
> I believe this is an issue because it means that a single badly behaved 
> connector (any connector that does I/O without timeouts) can cause the Kafka 
> Connect worker to get into a state where the only solution is to restart the 
> JVM.
> I think, but couldn't reproduce, that this is the cause of this problem on 
> Stack Overflow: 
> https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work
> h2. Expected Result
> I would expect the Worker to eventually interrupt the thread that the task is 
> running in. In the past across various other libraries, this is what I've 
> seen done when a thread needs to be forcibly stopped.
> h2. Actual Result
> In actuality, the Worker sets a {{stopping}} flag and lets the thread run 
> indefinitely. It uses a timeout while waiting for the task to stop but after 
> this timeout has expired it simply sets a {{cancelled}} flag. This means that 
> every time a task is restarted, a new thread running the task will be 
> created. Thus a task may end up with multiple instances all running in their 
> own threads when there's only supposed to be a single thread.
> h2. Steps to Reproduce
> The problem can be replicated by using the connector available here: 
> https://github.com/smarter-travel-media/hang-connector
> Apologies for how involved the steps are.
> I've created a patch that forcibly interrupts threads after they fail to 
> gracefully shutdown here: 
> https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5
> I've confirmed that this fixes the issue. I can add some unit tests and 
> submit a PR if people agree that this is a bug and interrupting threads is 
> the right fix.
> Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2020-08-18 Thread yazgoo (Jira)
yazgoo created KAFKA-10413:
--

 Summary: rebalancing leads to unevenly balanced connectors
 Key: KAFKA-10413
 URL: https://issues.apache.org/jira/browse/KAFKA-10413
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.5.1
Reporter: yazgoo


Hi,

With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, if 
a connect instance disappear, or a new one appear, we're seeing unbalanced 
consumption, much like mentionned in this post:

[https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]


This usually leads to one kafka connect instance taking most of the load and 
consumption not being able to keep on.
Currently, we're "fixing" this by deleting the connector and re-creating it, 
but this is far from ideal.

Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2024-01-25 Thread yazgoo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17810789#comment-17810789
 ] 

yazgoo commented on KAFKA-10413:


Hello,

One year later with CP 7.5, I still have the issue

Regards

> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Assignee: rameshkrishnan muthusamy
>Priority: Major
> Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
> Attachments: connect_worker_balanced.png
>
>
> GHi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)