[ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860514#comment-17860514
 ] 

yazgoo edited comment on KAFKA-10413 at 6/27/24 2:47 PM:
---------------------------------------------------------

Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the scrit ends, I have two each connector one worker with connector #1 
tasks, the other one with connector #2 tasks.

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     80     "worker_id": "k1:8081"
     20     "worker_id": "k2:8082"
     20     "worker_id": "k3:8083"
{code}
 

In the end, we indeed get 80 tasks on each workers, but for distribution 
reasons , I think it should be (40, 40, 40) for each connector


was (Author: yazgoo):
Here is yet another simpler version of the script, with less workers and which 
does not try and restart any worker:
{code:java}
 {code}
 

 
{code:java}
#!/bin/bash
set -xe
dkill() {
  docker stop "$1" || true
  docker rm -v -f "$1" || true
}
write_topic() {
  # write 200 messages to the topic
  json='{"name": "test"}'
  docker exec -i kafka bash -c "(for i in {1..200}; do echo '$json'; done) | 
/opt/kafka/bin/kafka-console-producer.sh --bootstrap-server 0.0.0.0:9092 
--topic test_topic$1"
}

launch_minio() {
  # Launch Minio (Fake S3)
  docker run --network host -d --name minio \
    -e MINIO_ROOT_USER=minioadmin \
    -e MINIO_ROOT_PASSWORD=minioadmin \
    minio/minio server --console-address :9001 /data
      docker exec -it minio mkdir /data/my-minio-bucket
}
launch_kafka_connect() {
  # Start Kafka Connect with S3 Connector
  docker run --network host -d --name "kafka-connect$1" \
    -e  AWS_ACCESS_KEY_ID=minioadmin \
    -e  AWS_SECRET_ACCESS_KEY=minioadmin \
    -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
    -e  CONNECT_LISTENERS="http://localhost:808$1"; \
    -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
    -e  CONNECT_REST_PORT="808$1" \
    -e  CONNECT_GROUP_ID="connect-cluster" \
    -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
    -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
    -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
    -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  
CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
    -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
    -e  
CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
    --entrypoint bash \
    confluentinc/cp-kafka-connect:7.6.1 \
    -c "confluent-hub install --no-prompt confluentinc/kafka-connect-s3:latest 
&& /etc/confluent/docker/run"
}
cleanup_docker_env() {
  docker volume prune -f
  for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
minio
  do
    dkill "$container"
  done
}
launch_kafka() {
  docker run --network host --hostname localhost --ulimit nofile=65536:65536 -d 
--name kafka -p 9092:9092 apache/kafka
  for i in {1..2}
  do
    # Create a Kafka topic
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 120 --topic 
"test_topic$i"
    write_topic "$i"
  done
  for topic in connect-configs connect-offsets connect-status
  do
    # with cleanup.policy=compact, we can't have more than 1 partition
    docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
--bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
$topic --config cleanup.policy=compact
  done
}
cleanup_docker_env
launch_kafka
launch_minio
launch_kafka_connect 1
while true
do
  sleep 5
  # Check if Kafka Connect is up
  curl http://localhost:8081/ || continue
  break
done
sleep 10
for i in {1..2}
do
# Set up a connector
curl -X POST -H "Content-Type: application/json" --data '{
  "name": "s3-connector'"$i"'",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "120",
    "topics": "test_topic'"$i"'",
    "s3.region": "us-east-1",
    "store.url": "http://0.0.0.0:9000";,
    "s3.bucket.name": "my-minio-bucket",
    "s3.part.size": "5242880",
    "flush.size": "3",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
    "schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "schema.compatibility": "NONE"
  }
}' http://localhost:8081/connectors
done
launch_kafka_connect 2
launch_kafka_connect 3
{code}
 

 

When the scrit ends, I have two each connector one worker with connector #1 
tasks, the other one with connector #2 tasks.

Then I wait 3 minutes

And I get the final state:
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
|grep worker_id | sort | uniq -c
     60     "worker_id": "k2:8082"
     60     "worker_id": "k3:8083"{code}
 
{code:java}
❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
|grep worker_id | sort | uniq -c
     80     "worker_id": "k1:8081"
     20     "worker_id": "k2:8082"
     20     "worker_id": "k3:8083"
{code}
 

In the end, we indeed get 80 tasks on each workers, but for distribution 
reasons , I think it should be (40, 40, 40) for each connector

> rebalancing leads to unevenly balanced connectors
> -------------------------------------------------
>
>                 Key: KAFKA-10413
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10413
>             Project: Kafka
>          Issue Type: Bug
>          Components: connect
>    Affects Versions: 2.5.1
>            Reporter: yazgoo
>            Assignee: rameshkrishnan muthusamy
>            Priority: Major
>             Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
>         Attachments: connect_worker_balanced.png, rebalance.sh
>
>
> GHi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to