[ 
https://issues.apache.org/jira/browse/KAFKA-17049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-17049:
-----------------------------------

    Assignee: Greg Harris

> unbalanced connectors
> ---------------------
>
>                 Key: KAFKA-17049
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17049
>             Project: Kafka
>          Issue Type: Bug
>          Components: connect
>            Reporter: yazgoo
>            Assignee: Greg Harris
>            Priority: Major
>
> This follows https://issues.apache.org/jira/browse/KAFKA-10413
> When runnning the following script, which
> 1. runs one worker
> 2. declares two connectors
> 3. adds two more workers
>  
> {code:java}
> #!/bin/bash
> set -xe
> dkill() {
>   docker stop "$1" || true
>   docker rm -v -f "$1" || true
> }
> launch_minio() {
>   # Launch Minio (Fake S3)
>   docker run --network host -d --name minio \
>     -e MINIO_ROOT_USER=minioadmin \
>     -e MINIO_ROOT_PASSWORD=minioadmin \
>     minio/minio server --console-address :9001 /data
>       docker exec -it minio mkdir /data/my-minio-bucket
> }
> launch_kafka_connect() {
>   # Start Kafka Connect with S3 Connector
>   docker run --network host -d --name "kafka-connect$1" \
>     -e  AWS_ACCESS_KEY_ID=minioadmin \
>     -e  AWS_SECRET_ACCESS_KEY=minioadmin \
>     -e  CONNECT_REST_ADVERTISED_HOST_NAME="k$1" \
>     -e  CONNECT_LISTENERS="http://localhost:808$1"; \
>     -e  CONNECT_BOOTSTRAP_SERVERS=0.0.0.0:9092 \
>     -e  CONNECT_REST_PORT="808$1" \
>     -e  CONNECT_GROUP_ID="connect-cluster" \
>     -e  CONNECT_CONFIG_STORAGE_TOPIC="connect-configs" \
>     -e  CONNECT_OFFSET_STORAGE_TOPIC="connect-offsets" \
>     -e  CONNECT_STATUS_STORAGE_TOPIC="connect-status" \
>     -e  CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter" 
> \
>     -e  CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter" \
>     -e  
> CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
>  \
>     -e  CONNECT_INTERNAL_VALUE_CONVERTER_SCHEMAS_ENABLE=false \
>     -e  
> CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components" \
>     --entrypoint bash \
>     confluentinc/cp-kafka-connect:7.6.1 \
>     -c "confluent-hub install --no-prompt 
> confluentinc/kafka-connect-s3:latest && /etc/confluent/docker/run"
> }
> cleanup_docker_env() {
>   docker volume prune -f
>   for container in $(for i in {1..9}; do echo "kafka-connect$i";done) kafka 
> minio
>   do
>     dkill "$container"
>   done
> }
> launch_kafka() {
>   docker run --network host --hostname localhost --ulimit nofile=65536:65536 
> -d --name kafka -p 9092:9092 apache/kafka
>   for i in {1..2}
>   do
>     # Create a Kafka topic
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 12 
> --topic "test_topic$i"
>   done
>   for topic in connect-configs connect-offsets connect-status
>   do
>     # with cleanup.policy=compact, we can't have more than 1 partition
>     docker exec -it kafka /opt/kafka/bin/kafka-topics.sh --create 
> --bootstrap-server 0.0.0.0:9092 --replication-factor 1 --partitions 1 --topic 
> $topic --config cleanup.policy=compact
>   done
> }
> cleanup_docker_env
> launch_kafka
> launch_minio
> launch_kafka_connect 1
> while true
> do
>   sleep 5
>   # Check if Kafka Connect is up
>   curl http://localhost:8081/ || continue
>   break
> done
> sleep 10
> for i in {1..2}
> do
> # Set up a connector
> curl -X POST -H "Content-Type: application/json" --data '{
>   "name": "s3-connector'"$i"'",
>   "config": {
>     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
>     "tasks.max": "12",
>     "topics": "test_topic'"$i"'",
>     "s3.region": "us-east-1",
>     "store.url": "http://0.0.0.0:9000";,
>     "s3.bucket.name": "my-minio-bucket",
>     "s3.part.size": "5242880",
>     "flush.size": "3",
>     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
>     "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
>     "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
>     "schema.compatibility": "NONE"
>   }
> }' http://localhost:8081/connectors
> done
> launch_kafka_connect 2
> launch_kafka_connect 3
> {code}
>  
>  
> When the script ends, I have the first worker taking all the connectors/tasks:
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
> |grep worker_id | sort | uniq -c
>     12     "worker_id": "k1:8081"{code}
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
> |grep worker_id | sort | uniq -c
>     12     "worker_id": "k1:8081"
> {code}
>  
> Then I wait a few minutes,
> And I get the final state:
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector2/status | jq .tasks 
> |grep worker_id | sort | uniq -c
>      6     "worker_id": "k2:8082"
>      6     "worker_id": "k3:8083"{code}
>  
> {code:java}
> ❯ curl -s http://localhost:8081/connectors/s3-connector1/status | jq .tasks 
> |grep worker_id | sort | uniq -c
>      8     "worker_id": "k1:8081"
>      2     "worker_id": "k2:8082"
>      2     "worker_id": "k3:8083"
> {code}
>  
> In the end, we indeed get 8 tasks on each workers, but for distribution 
> reasons , I think it should be (4, 4, 4) for each connector, because all 
> connectors don't do the same amount of work, which will lead to a 
> processing/network imbalance overall.
> In my test I always get the same outcome.
> This is consistent with what I see in production, which makes autoscaling 
> impossible to use as is.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to