[jira] [Commented] (KAFKA-9747) No tasks created for a connector
[ https://issues.apache.org/jira/browse/KAFKA-9747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393655#comment-17393655 ] Daniel Urban commented on KAFKA-9747: - To add more details to the issue: * When running multiple workers * AND the Connector name contains a non-URL compatible character * AND a follower worker has the Connector in its assignment * Then the follower->leader request sent over Connect REST fails in RestClient (without any error logging, or the corresponding future ever completed) > No tasks created for a connector > > > Key: KAFKA-9747 > URL: https://issues.apache.org/jira/browse/KAFKA-9747 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0 > Environment: OS: Ubuntu 18.04 LTS > Platform: Confluent Platform 5.4 > HW: The same behaviour on various AWS instances - from t3.small to c5.xlarge >Reporter: Vit Koma >Assignee: Andras Katona >Priority: Major > Attachments: connect-distributed.properties, connect.log > > > We are running Kafka Connect in a distributed mode on 3 nodes using Debezium > (MongoDB) and Confluent S3 connectors. When adding a new connector via the > REST API the connector is created in RUNNING state, but no tasks are created > for the connector. > Pausing and resuming the connector does not help. When we stop all workers > and then start them again, the tasks are created and everything runs as it > should. > The issue does not show up if we run only a single node. > The issue is not caused by the connector plugins, because we see the same > behaviour for both Debezium and S3 connectors. Also in debug logs I can see > that Debezium is correctly returning a task configuration from the > Connector.taskConfigs() method. > Connector configuration examples > Debezium: > {code} > { > "name": "qa-mongodb-comp-converter-task|1", > "config": { > "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", > "mongodb.hosts": > "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017", > "mongodb.name": "qa-debezium-comp", > "mongodb.ssl.enabled": true, > "collection.whitelist": "converter[.]task", > "tombstones.on.delete": true > } > } > {code} > S3 Connector: > {code} > { > "name": "qa-s3-sink-task|1", > "config": { > "connector.class": "io.confluent.connect.s3.S3SinkConnector", > "topics": "qa-debezium-comp.converter.task", > "topics.dir": "data/env/qa", > "s3.region": "eu-west-1", > "s3.bucket.name": "", > "flush.size": "15000", > "rotate.interval.ms": "360", > "storage.class": "io.confluent.connect.s3.storage.S3Storage", > "format.class": > "custom.kafka.connect.s3.format.plaintext.PlaintextFormat", > "schema.generator.class": > "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", > "partitioner.class": > "io.confluent.connect.storage.partitioner.DefaultPartitioner", > "schema.compatibility": "NONE", > "key.converter": "org.apache.kafka.connect.json.JsonConverter", > "value.converter": "org.apache.kafka.connect.json.JsonConverter", > "key.converter.schemas.enable": false, > "value.converter.schemas.enable": false, > "transforms": "ExtractDocument", > > "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value" > } > } > {code} > The connectors are created using curl: {{curl -X POST -H "Content-Type: > application/json" --data @ http:/:10083/connectors}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lkokhreidze edited a comment on pull request #7170: KIP-221 / Add KStream#repartition operation
lkokhreidze edited a comment on pull request #7170: URL: https://github.com/apache/kafka/pull/7170#issuecomment-610434777 Hi @mjsax @vvcephei Small update: https://github.com/apache/kafka/pull/7170/commits/f2bcdfe487ec13f41f4516d6bee6a1847f6d4ce2 In this commit I've added Topology optimization option as test parameter. This PR touches topology optimization (indirectly). In order to make sure that everything works as expected, I though it would beneficial in the integration tests verifying both, `topology.optimization: all` and `topology.optimization: none` configurations. Hope this makes sense. Regards, Levani -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13163) Issue with MySql worker sink connector
[ https://issues.apache.org/jira/browse/KAFKA-13163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393600#comment-17393600 ] Muddam Pullaiah Yadav commented on KAFKA-13163: --- Can anyone help me on this issue??. > Issue with MySql worker sink connector > -- > > Key: KAFKA-13163 > URL: https://issues.apache.org/jira/browse/KAFKA-13163 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Affects Versions: 2.1.1 > Environment: PreProd >Reporter: Muddam Pullaiah Yadav >Priority: Major > > Please help with the following issue. Really appreciate it! > > We are using Azure HDInsight Kafka cluster > My sink Properties: > > cat mysql-sink-connector > { > "name":"mysql-sink-connector", > "config": > { "tasks.max":"2", "batch.size":"1000", "batch.max.rows":"1000", > "poll.interval.ms":"500", > "connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", > "connection.url":"jdbc:mysql://moddevdb.mysql.database.azure.com:3306/db_test_dev", > "table.name":"db_test_dev.tbl_clients_merchants", "topics":"test", > "connection.user":"grabmod", "connection.password":"#admin", > "auto.create":"true", "auto.evolve":"true", > "value.converter":"org.apache.kafka.connect.json.JsonConverter", > "value.converter.schemas.enable":"false", > "key.converter":"org.apache.kafka.connect.json.JsonConverter", > "key.converter.schemas.enable":"true" } > } > > [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} > Task threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:177) > org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in > error handler > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema > type: null > at > org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:743) > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) > ... 13 more > [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} > Task is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:178) > [2021-08-04 11:18:30,235] INFO [Consumer clientId=consumer-18, > groupId=connect-mysql-sink-connector] Sending LeaveGroup request to > coordinator > wn2-grabde.fkgw2p1emuqu5d21xcbqrhqqbf.rx.internal.cloudapp.net:9092 (id: > 2147482646 rack: null) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:782) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13163) Issue with MySql worker sink connector
[ https://issues.apache.org/jira/browse/KAFKA-13163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Muddam Pullaiah Yadav updated KAFKA-13163: -- Reviewer: Ryan Dielhenn (was: saurab) > Issue with MySql worker sink connector > -- > > Key: KAFKA-13163 > URL: https://issues.apache.org/jira/browse/KAFKA-13163 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Affects Versions: 2.1.1 > Environment: PreProd >Reporter: Muddam Pullaiah Yadav >Priority: Major > > Please help with the following issue. Really appreciate it! > > We are using Azure HDInsight Kafka cluster > My sink Properties: > > cat mysql-sink-connector > { > "name":"mysql-sink-connector", > "config": > { "tasks.max":"2", "batch.size":"1000", "batch.max.rows":"1000", > "poll.interval.ms":"500", > "connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", > "connection.url":"jdbc:mysql://moddevdb.mysql.database.azure.com:3306/db_test_dev", > "table.name":"db_test_dev.tbl_clients_merchants", "topics":"test", > "connection.user":"grabmod", "connection.password":"#admin", > "auto.create":"true", "auto.evolve":"true", > "value.converter":"org.apache.kafka.connect.json.JsonConverter", > "value.converter.schemas.enable":"false", > "key.converter":"org.apache.kafka.connect.json.JsonConverter", > "key.converter.schemas.enable":"true" } > } > > [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} > Task threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:177) > org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in > error handler > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema > type: null > at > org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:743) > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) > ... 13 more > [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} > Task is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:178) > [2021-08-04 11:18:30,235] INFO [Consumer clientId=consumer-18, > groupId=connect-mysql-sink-connector] Sending LeaveGroup request to > coordinator > wn2-grabde.fkgw2p1emuqu5d21xcbqrhqqbf.rx.internal.cloudapp.net:9092 (id: > 2147482646 rack: null) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:782) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13163) Issue with MySql worker sink connector
[ https://issues.apache.org/jira/browse/KAFKA-13163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Muddam Pullaiah Yadav updated KAFKA-13163: -- Reviewer: saurab (was: Ryan Dielhenn) Description: Please help with the following issue. Really appreciate it! We are using Azure HDInsight Kafka cluster My sink Properties: cat mysql-sink-connector { "name":"mysql-sink-connector", "config": { "tasks.max":"2", "batch.size":"1000", "batch.max.rows":"1000", "poll.interval.ms":"500", "connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "connection.url":"jdbc:mysql://moddevdb.mysql.database.azure.com:3306/db_test_dev", "table.name":"db_test_dev.tbl_clients_merchants", "topics":"test", "connection.user":"grabmod", "connection.password":"#admin", "auto.create":"true", "auto.evolve":"true", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":"true" } } [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177) org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema type: null at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:743) at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 13 more [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178) [2021-08-04 11:18:30,235] INFO [Consumer clientId=consumer-18, groupId=connect-mysql-sink-connector] Sending LeaveGroup request to coordinator wn2-grabde.fkgw2p1emuqu5d21xcbqrhqqbf.rx.internal.cloudapp.net:9092 (id: 2147482646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:782) was: Please help with the following issue. Really appreciate it! We are using Azure HDInsight Kafka cluster My sink Properties: cat mysql-sink-connector { "name":"mysql-sink-connector", "config": { "tasks.max":"2", "batch.size":"1000", "batch.max.rows":"1000", "poll.interval.ms":"500", "connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "connection.url":"jdbc:mysql://moddevdb.mysql.database.azure.com:3306/db_grab_dev", "table.name":"db_grab_dev.tbl_clients_merchants", "topics":"test", "connection.user":"grabmod", "connection.password":"#admin", "auto.create":"true", "auto.evolve":"true", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":"true" } } [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177) org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:1
[jira] [Updated] (KAFKA-13163) Issue with MySql worker sink connector
[ https://issues.apache.org/jira/browse/KAFKA-13163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Muddam Pullaiah Yadav updated KAFKA-13163: -- Reviewer: Ryan Dielhenn Issue Type: Task (was: Bug) > Issue with MySql worker sink connector > -- > > Key: KAFKA-13163 > URL: https://issues.apache.org/jira/browse/KAFKA-13163 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Affects Versions: 2.1.1 > Environment: PreProd >Reporter: Muddam Pullaiah Yadav >Priority: Major > > Please help with the following issue. Really appreciate it! > > We are using Azure HDInsight Kafka cluster > My sink Properties: > > cat mysql-sink-connector > { > "name":"mysql-sink-connector", > "config": > { "tasks.max":"2", "batch.size":"1000", "batch.max.rows":"1000", > "poll.interval.ms":"500", > "connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", > "connection.url":"jdbc:mysql://moddevdb.mysql.database.azure.com:3306/db_grab_dev", > "table.name":"db_grab_dev.tbl_clients_merchants", "topics":"test", > "connection.user":"grabmod", "connection.password":"#admin", > "auto.create":"true", "auto.evolve":"true", > "value.converter":"org.apache.kafka.connect.json.JsonConverter", > "value.converter.schemas.enable":"false", > "key.converter":"org.apache.kafka.connect.json.JsonConverter", > "key.converter.schemas.enable":"true" } > } > > [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} > Task threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:177) > org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in > error handler > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema > type: null > at > org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:743) > at > org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) > ... 13 more > [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} > Task is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:178) > [2021-08-04 11:18:30,235] INFO [Consumer clientId=consumer-18, > groupId=connect-mysql-sink-connector] Sending LeaveGroup request to > coordinator > wn2-grabde.fkgw2p1emuqu5d21xcbqrhqqbf.rx.internal.cloudapp.net:9092 (id: > 2147482646 rack: null) > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:782) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #11178: KAFKA-13168: KRaft observers should not have a replica id
jsancio commented on a change in pull request #11178: URL: https://github.com/apache/kafka/pull/11178#discussion_r683111844 ## File path: core/src/main/scala/kafka/raft/RaftManager.scala ## @@ -180,6 +181,12 @@ class KafkaRaftManager[T]( val expirationService = new TimingWheelExpirationService(expirationTimer) val quorumStateStore = new FileBasedStateStore(new File(dataDir, "quorum-state")) +val nodeId = if (config.processRoles.contains(BrokerRole) && !config.processRoles.contains(ControllerRole)) { + OptionalInt.empty() +} else { + OptionalInt.of(config.nodeId) +} + Review comment: ```suggestion val nodeId = if (config.processRoles.contains(ControllerRole)) { OptionalInt.of(config.nodeId) } else { OptionalInt.empty() } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters
jsancio commented on a change in pull request #11179: URL: https://github.com/apache/kafka/pull/11179#discussion_r683110083 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1949,6 +1951,15 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO ) } +val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty +if (processRoles.contains(ControllerRole)) { + // Ensure that controllers use their node.id as a voter in controller.quorum.voters + require(voterIds.contains(nodeId), s"The controller must contain a voter for it's ${KafkaConfig.NodeIdProp}=$nodeId in ${RaftConfig.QUORUM_VOTERS_CONFIG}.") +} else { + // Ensure that the broker's node.id is not an id in controller.quorum.voters + require(!voterIds.contains(nodeId), s"Since ${KafkaConfig.ProcessRolesProp}=broker, the the broker's ${KafkaConfig.NodeIdProp}=nodeId should not be in ${RaftConfig.QUORUM_VOTERS_CONFIG}") Review comment: How about "If ${process role config...} does not contain the 'controller' role, the node id $nodeId must not be included in the set of voters ${QUORUM_VOTERS_CONFIG}=$votersIds"? ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1949,6 +1951,15 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO ) } +val voterIds: Set[Integer] = if (usesSelfManagedQuorum) RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else Set.empty +if (processRoles.contains(ControllerRole)) { + // Ensure that controllers use their node.id as a voter in controller.quorum.voters + require(voterIds.contains(nodeId), s"The controller must contain a voter for it's ${KafkaConfig.NodeIdProp}=$nodeId in ${RaftConfig.QUORUM_VOTERS_CONFIG}.") Review comment: How about "If ${process role config...} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${QUORUM_VOTERS_CONFIG}=$votersIds"? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on pull request #10259: MINOR: Provide valid examples in README page.
kamalcph commented on pull request #10259: URL: https://github.com/apache/kafka/pull/10259#issuecomment-893137436 @ijuma Can you please review and merge this patch? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #11153: MINOR: Port fix to other StoreQueryIntegrationTests
wcarlson5 commented on pull request #11153: URL: https://github.com/apache/kafka/pull/11153#issuecomment-893137087 @vvcephei looks like it is working now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683102378 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -53,44 +56,162 @@ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); private final StreamsConfig config; -private final SortedMap builders; // Keep sorted by topology name for readability +private final TopologyVersion version; + +private final ConcurrentNavigableMap builders; // Keep sorted by topology name for readability Review comment: Well I removed the whole `assignedNamedTopologies` thing from this PR, but yes, that was one of the original intentions of adding it to the AssignmentInfo. I'm leaning towards keeping things simple for now and avoid adding on to the AssignmentInfo where possible. We can revisit this if the extra rebalances become a problem. It could be sufficient to just use the set of actually-assigned topologies as a proxy. It's not perfect since it could be that the leader was aware of a topology and simply happened to not assign any of its tasks to this client, but I'd bet this gets rid of most of the unnecessary rebalances. That said, I'm saving this optimization for a followup Pt. 4 PR. I'll revisit the best way to handle the `builders` map in that as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683096288 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java ## @@ -67,13 +72,31 @@ public class NamedTopologyIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); +// TODO KAFKA-12648: Review comment: I'm still filling out the integration test suite, especially the multi-node testing, but I'll make sure this scenario has coverage. This will probably have to be in the followup Pt. 4 which expands `add/removeNamedTopology` to return a `Future`, since being able to block on this helps a lot with the testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683095231 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -867,6 +873,36 @@ private void initializeAndRestorePhase() { log.debug("Idempotent restore call done. Thread state has not changed."); } +private void handleTopologyUpdatesPhase() { +// Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology +// or if this is the very first topology in which case we may need to wait for it to be non-empty +if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || lastSeenTopologyVersion == 0) { +try { Review comment: I refactored this part of the code quite a bit, let me know if you have any remaining concerns with the new logic -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr opened a new pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters
dielhennr opened a new pull request #11179: URL: https://github.com/apache/kafka/pull/11179 Under certain configuration is possible for the Kafka Server to boot up as a broker only but be the cluster metadata quorum leader. We should validate the configuration to avoid this case. https://issues.apache.org/jira/browse/KAFKA-13165 Tested manually by starting up a broker and a controller both with valid/invalid configurations -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on pull request #10788: URL: https://github.com/apache/kafka/pull/10788#issuecomment-893124886 @guozhangwang I think I've addressed all your feedback and significantly cleaned up the streamthread event loop + topology locking, let me know if there's anything else -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig
ccding commented on a change in pull request #0: URL: https://github.com/apache/kafka/pull/0#discussion_r683068699 ## File path: core/src/main/scala/kafka/log/LogConfig.scala ## @@ -107,46 +107,52 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String] val LeaderReplicationThrottledReplicas = getList(LogConfig.LeaderReplicationThrottledReplicasProp) val FollowerReplicationThrottledReplicas = getList(LogConfig.FollowerReplicationThrottledReplicasProp) val messageDownConversionEnable = getBoolean(LogConfig.MessageDownConversionEnableProp) - val remoteStorageEnable = getBoolean(LogConfig.RemoteLogStorageEnableProp) - val localRetentionMs: Long = { -val localLogRetentionMs = getLong(LogConfig.LocalLogRetentionMsProp) + class TieredLogConfig { Review comment: Done. Thanks for the comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr opened a new pull request #11178: KAFKA-13168: KRaft observers should not have a replica id
dielhennr opened a new pull request #11178: URL: https://github.com/apache/kafka/pull/11178 Fix the `KafkaRaftClient` to use Optional.empty as its localId so that the sentinel `node.id`, `node.id=-1` is sent as the replicaId in the `FetchRequest`. https://issues.apache.org/jira/browse/KAFKA-13168 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683065432 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -53,44 +56,162 @@ private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); private final StreamsConfig config; -private final SortedMap builders; // Keep sorted by topology name for readability +private final TopologyVersion version; + +private final ConcurrentNavigableMap builders; // Keep sorted by topology name for readability private ProcessorTopology globalTopology; -private Map globalStateStores = new HashMap<>(); -final Set allInputTopics = new HashSet<>(); +private final Map globalStateStores = new HashMap<>(); +private final Set allInputTopics = new HashSet<>(); + +public static class TopologyVersion { +public AtomicLong topologyVersion = new AtomicLong(0L); // the local topology version +public Set assignedNamedTopologies = new HashSet<>(); // the named topologies whose tasks are actively assigned +public ReentrantLock topologyLock = new ReentrantLock(); +public Condition topologyCV = topologyLock.newCondition(); +} -public TopologyMetadata(final InternalTopologyBuilder builder, final StreamsConfig config) { +public TopologyMetadata(final InternalTopologyBuilder builder, +final StreamsConfig config) { +version = new TopologyVersion(); this.config = config; -builders = new TreeMap<>(); +builders = new ConcurrentSkipListMap<>(); if (builder.hasNamedTopology()) { builders.put(builder.topologyName(), builder); } else { builders.put(UNNAMED_TOPOLOGY, builder); } } -public TopologyMetadata(final SortedMap builders, final StreamsConfig config) { +public TopologyMetadata(final ConcurrentNavigableMap builders, +final StreamsConfig config) { +version = new TopologyVersion(); this.config = config; + this.builders = builders; if (builders.isEmpty()) { -log.debug("Building KafkaStreams app with no empty topology"); +log.debug("Starting up empty KafkaStreams app with no topology"); } } -public int getNumStreamThreads(final StreamsConfig config) { -final int configuredNumStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG); +public void updateCurrentAssignmentTopology(final Set assignedNamedTopologies) { +version.assignedNamedTopologies = assignedNamedTopologies; +} -// If the application uses named topologies, it's possible to start up with no topologies at all and only add them later -if (builders.isEmpty()) { -if (configuredNumStreamThreads != 0) { -log.info("Overriding number of StreamThreads to zero for empty topology"); +/** + * @return the set of named topologies that the assignor distributed tasks for during the last rebalance + */ +public Set assignmentNamedTopologies() { +return version.assignedNamedTopologies; +} + +public long topologyVersion() { +return version.topologyVersion.get(); +} + +public void lock() { +version.topologyLock.lock(); +} + +public void unlock() { +version.topologyLock.unlock(); +} + +public InternalTopologyBuilder getBuilderForTopologyName(final String name) { +return builders.get(name); +} + +/** + * @throws IllegalStateException if the thread is not already holding the lock via TopologyMetadata#lock + */ +public void maybeWaitForNonEmptyTopology() { +if (!version.topologyLock.isHeldByCurrentThread()) { Review comment: Agreed, let me clean this up -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683057288 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition topicPartition, final Lon } } +/** + * Checks for added or removed NamedTopologies that correspond to any assigned tasks, and creates/freezes them if so + */ +void handleTopologyUpdates() { +tasks.maybeCreateTasksFromNewTopologies(); +for (final Task task : activeTaskIterable()) { +if (topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) { +task.freezeProcessing(); Review comment: This is another stupid bug...it should be freezing them if they _don't _ appear in the current topology. Tasks from removed named topologies are frozen to prevent them from processing any more records, so yes, it is meant to be permanent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
ableegoldman commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r683055067 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ## @@ -84,6 +86,15 @@ void setMainConsumer(final Consumer mainConsumer) { this.mainConsumer = mainConsumer; } +void handleNewAssignmentAndCreateTasks(final Map> activeTasksToCreate, + final Map> standbyTasksToCreate, + final Set assignedActiveTasks, + final Set assignedStandbyTasks) { +activeTaskCreator.removeRevokedUnknownTasks(diff(HashSet::new, assignedActiveTasks, activeTasksToCreate.keySet())); Review comment: Whoops, that's what happens when you push changes late at night 🙃 I'll fix it up...hopefully will make more sense then -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)
jolshan commented on pull request #11171: URL: https://github.com/apache/kafka/pull/11171#issuecomment-893071380 Ok, tested locally (with the correct jar this time) and it looked good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan removed a comment on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)
jolshan removed a comment on pull request #11171: URL: https://github.com/apache/kafka/pull/11171#issuecomment-893066311 Just tested locally and something is still wrong here. Need to followup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan edited a comment on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)
jolshan edited a comment on pull request #11171: URL: https://github.com/apache/kafka/pull/11171#issuecomment-893066311 Just tested locally and something is still wrong here. Need to followup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)
jolshan commented on pull request #11171: URL: https://github.com/apache/kafka/pull/11171#issuecomment-893066311 Just tested and something is still wrong here. Need to followup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13168) KRaft observers should not have a replica id
[ https://issues.apache.org/jira/browse/KAFKA-13168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn reassigned KAFKA-13168: - Assignee: Ryan Dielhenn > KRaft observers should not have a replica id > > > Key: KAFKA-13168 > URL: https://issues.apache.org/jira/browse/KAFKA-13168 > Project: Kafka > Issue Type: Bug > Components: kraft >Reporter: Jose Armando Garcia Sancio >Assignee: Ryan Dielhenn >Priority: Blocker > Labels: kip-500 > Fix For: 3.0.0 > > > To avoid miss configuration of a broker affecting the quorum of the cluster > metadata partition when a Kafka node is configure as broker only the replica > id for the KRaft client should be set to {{Optional::empty()}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13165) Validate node id, process role and quorum voters
[ https://issues.apache.org/jira/browse/KAFKA-13165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryan Dielhenn reassigned KAFKA-13165: - Assignee: Ryan Dielhenn > Validate node id, process role and quorum voters > > > Key: KAFKA-13165 > URL: https://issues.apache.org/jira/browse/KAFKA-13165 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: Jose Armando Garcia Sancio >Assignee: Ryan Dielhenn >Priority: Blocker > Labels: kip-500 > Fix For: 3.0.0 > > > Under certain configuration is possible for the Kafka Server to boot up as a > broker only but be the cluster metadata quorum leader. We should validate the > configuration to avoid this case. > # If the {{process.roles}} contains {{controller}} then the {{node.id}} > needs to be in the {{controller.quorum.voters}} > # If the {{process.roles}} doesn't contain {{controller}} then the > {{node.id}} cannot be in the {{controller.quorum.voters}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12724) Add 2.8.0 to system tests and streams upgrade tests
[ https://issues.apache.org/jira/browse/KAFKA-12724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393508#comment-17393508 ] Konstantine Karantasis commented on KAFKA-12724: The PR got merged, so I'm resolving this issue. Thanks [~vvcephei] and [~ckamal] If any issues persist in system tests we can always reopen this ticket. > Add 2.8.0 to system tests and streams upgrade tests > --- > > Key: KAFKA-12724 > URL: https://issues.apache.org/jira/browse/KAFKA-12724 > Project: Kafka > Issue Type: Task > Components: streams, system tests >Reporter: Kamal Chandraprakash >Assignee: Kamal Chandraprakash >Priority: Blocker > Fix For: 3.0.0 > > > Kafka v2.8.0 is released. We should add this version to the system tests. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13160) Fix the code that calls the broker’s config handler to pass the expected default resource name when using KRaft.
[ https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-13160. -- Resolution: Fixed > Fix the code that calls the broker’s config handler to pass the expected > default resource name when using KRaft. > > > Key: KAFKA-13160 > URL: https://issues.apache.org/jira/browse/KAFKA-13160 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Ryan Dielhenn >Assignee: Ryan Dielhenn >Priority: Blocker > Fix For: 3.0.0 > > > In a ZK cluster, dynamic default broker configs are stored in the zNode > /brokers/. Without this fix, when dynamic configs from snapshots are > processed by the KRaft brokers, the BrokerConfigHandler checks if the > resource name is "" to do a default update and converts the resource > name to an integer otherwise to do a per-broker config update. > In KRaft, dynamic default broker configs are serialized in metadata with > empty string instead of "". This was causing the BrokerConfigHandler > to throw a NumberFormatException for dynamic default broker configs since the > resource name for them is not "" or a single integer. The code that > calls the handler method for config changes should be fixed to pass > "" instead of empty string to the handler method if using KRaft. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] joel-hamill closed pull request #4536: [WIP] Migrate HTML to RST
joel-hamill closed pull request #4536: URL: https://github.com/apache/kafka/pull/4536 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #11168: KAFKA-13160: Fix BrokerMetadataPublisher to pass the correct resource name to the config handler when processing config updates
cmccabe merged pull request #11168: URL: https://github.com/apache/kafka/pull/11168 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)
jolshan commented on a change in pull request #11171: URL: https://github.com/apache/kafka/pull/11171#discussion_r683019982 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -553,6 +553,14 @@ class Log(@volatile var logStartOffset: Long, /** Only used for ZK clusters when we update and start using topic IDs on existing topics */ def assignTopicId(topicId: Uuid): Unit = { +// defensively check that any newly assign topic ID matches any that is already set +_topicId.foreach { current => + if (!current.equals(topicId)) Review comment: We can. I was just looking at this case again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)
hachikuji commented on a change in pull request #11171: URL: https://github.com/apache/kafka/pull/11171#discussion_r683019148 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -553,6 +553,14 @@ class Log(@volatile var logStartOffset: Long, /** Only used for ZK clusters when we update and start using topic IDs on existing topics */ def assignTopicId(topicId: Uuid): Unit = { +// defensively check that any newly assign topic ID matches any that is already set +_topicId.foreach { current => + if (!current.equals(topicId)) Review comment: Can we shortcut return if the current topicId is already defined and matches the provided topicId? ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -553,6 +553,14 @@ class Log(@volatile var logStartOffset: Long, /** Only used for ZK clusters when we update and start using topic IDs on existing topics */ def assignTopicId(topicId: Uuid): Unit = { +// defensively check that any newly assign topic ID matches any that is already set +_topicId.foreach { current => + if (!current.equals(topicId)) + // we should never get here as the topic IDs should have been checked in becomeLeaderOrFollower Review comment: nit: fix alignment (just use braces 😉 . I won't tell anyone) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #11177: KAFKA-13167; KRaft broker should send heartbeat immediately after starting controlled shutdown
hachikuji merged pull request #11177: URL: https://github.com/apache/kafka/pull/11177 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13168) KRaft observers should not have a replica id
Jose Armando Garcia Sancio created KAFKA-13168: -- Summary: KRaft observers should not have a replica id Key: KAFKA-13168 URL: https://issues.apache.org/jira/browse/KAFKA-13168 Project: Kafka Issue Type: Bug Components: kraft Reporter: Jose Armando Garcia Sancio Fix For: 3.0.0 To avoid miss configuration of a broker affecting the quorum of the cluster metadata partition when a Kafka node is configure as broker only the replica id for the KRaft client should be set to {{Optional::empty()}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13165) Validate node id, process role and quorum voters
[ https://issues.apache.org/jira/browse/KAFKA-13165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-13165: --- Labels: kip-500 (was: ) > Validate node id, process role and quorum voters > > > Key: KAFKA-13165 > URL: https://issues.apache.org/jira/browse/KAFKA-13165 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: Jose Armando Garcia Sancio >Priority: Blocker > Labels: kip-500 > Fix For: 3.0.0 > > > Under certain configuration is possible for the Kafka Server to boot up as a > broker only but be the cluster metadata quorum leader. We should validate the > configuration to avoid this case. > # If the {{process.roles}} contains {{controller}} then the {{node.id}} > needs to be in the {{controller.quorum.voters}} > # If the {{process.roles}} doesn't contain {{controller}} then the > {{node.id}} cannot be in the {{controller.quorum.voters}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13165) Validate node id, process role and quorum voters
[ https://issues.apache.org/jira/browse/KAFKA-13165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-13165: --- Fix Version/s: 3.0.0 > Validate node id, process role and quorum voters > > > Key: KAFKA-13165 > URL: https://issues.apache.org/jira/browse/KAFKA-13165 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: Jose Armando Garcia Sancio >Priority: Blocker > Fix For: 3.0.0 > > > Under certain configuration is possible for the Kafka Server to boot up as a > broker only but be the cluster metadata quorum leader. We should validate the > configuration to avoid this case. > # If the {{process.roles}} contains {{controller}} then the {{node.id}} > needs to be in the {{controller.quorum.voters}} > # If the {{process.roles}} doesn't contain {{controller}} then the > {{node.id}} cannot be in the {{controller.quorum.voters}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13165) Validate node id, process role and quorum voters
[ https://issues.apache.org/jira/browse/KAFKA-13165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-13165: --- Priority: Blocker (was: Major) > Validate node id, process role and quorum voters > > > Key: KAFKA-13165 > URL: https://issues.apache.org/jira/browse/KAFKA-13165 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: Jose Armando Garcia Sancio >Priority: Blocker > > Under certain configuration is possible for the Kafka Server to boot up as a > broker only but be the cluster metadata quorum leader. We should validate the > configuration to avoid this case. > # If the {{process.roles}} contains {{controller}} then the {{node.id}} > needs to be in the {{controller.quorum.voters}} > # If the {{process.roles}} doesn't contain {{controller}} then the > {{node.id}} cannot be in the {{controller.quorum.voters}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13165) Validate node id, process role and quorum voters
[ https://issues.apache.org/jira/browse/KAFKA-13165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393486#comment-17393486 ] Jose Armando Garcia Sancio commented on KAFKA-13165: We discussed this offline. We decided to have controller and brokers share the same id space. In other works it is expected that the id of a controller only node doesn't conflict with the id of a broker only node. I am going to make the validation describe in this Jira a blocker of 3.0. > Validate node id, process role and quorum voters > > > Key: KAFKA-13165 > URL: https://issues.apache.org/jira/browse/KAFKA-13165 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: Jose Armando Garcia Sancio >Priority: Major > > Under certain configuration is possible for the Kafka Server to boot up as a > broker only but be the cluster metadata quorum leader. We should validate the > configuration to avoid this case. > # If the {{process.roles}} contains {{controller}} then the {{node.id}} > needs to be in the {{controller.quorum.voters}} > # If the {{process.roles}} doesn't contain {{controller}} then the > {{node.id}} cannot be in the {{controller.quorum.voters}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei merged pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
vvcephei merged pull request #10602: URL: https://github.com/apache/kafka/pull/10602 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
vvcephei commented on pull request #10602: URL: https://github.com/apache/kafka/pull/10602#issuecomment-893016769 There were still 21 test failures, but they do not look related to this change. There are also fixes ongoing in parallel to other tests, so I'm going to go ahead and merge this one. If it turns out to be the cause of new problems, we should just revert it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #11177: KAFKA-13167; KRaft broker should send heartbeat immediately after starting controlled shutdown
hachikuji opened a new pull request #11177: URL: https://github.com/apache/kafka/pull/11177 Controlled shutdown in KRaft is signaled through a heartbeat request with the `shouldShutDown` flag set to true. When we begin controlled shutdown, we should immediately schedule the next heartbeat instead of waiting for the next periodic heartbeat. This allows the broker to shutdown more quickly. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11151: MINOR: Should commit a task if the consumer position advanced as well
guozhangwang commented on a change in pull request #11151: URL: https://github.com/apache/kafka/pull/11151#discussion_r682935981 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -1158,7 +1158,38 @@ public String toString(final String indent) { @Override public boolean commitNeeded() { -return commitNeeded; +// we need to do an extra check if the flag was false, that +// if the consumer position has been updated; this is because +// there may be non data records such as control markers bypassed +if (commitNeeded) { +return true; +} else { +for (final Map.Entry entry : consumedOffsets.entrySet()) { +final TopicPartition partition = entry.getKey(); +try { +final long offset = mainConsumer.position(partition); + +// note the position in consumer is the "next" record to fetch, +// so it should be larger than the consumed offset by 1; if it is +// more than 1 it means there are skipped offsets Review comment: ack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang edited a comment on pull request #11155: MINOR: Fix flaky shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound
guozhangwang edited a comment on pull request #11155: URL: https://github.com/apache/kafka/pull/11155#issuecomment-890272874 ping @cadonna @ableegoldman for reviews. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
[ https://issues.apache.org/jira/browse/KAFKA-12935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Walker Carlson reassigned KAFKA-12935: -- Assignee: Walker Carlson > Flaky Test > RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore > > > Key: KAFKA-12935 > URL: https://issues.apache.org/jira/browse/KAFKA-12935 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Matthias J. Sax >Assignee: Walker Carlson >Priority: Critical > Labels: flaky-test > > {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at > org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374) > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 opened a new pull request #11176: MINOR: use relative counts for shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
wcarlson5 opened a new pull request #11176: URL: https://github.com/apache/kafka/pull/11176 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13167) KRaft broker should heartbeat immediately during controlled shutdown
Jason Gustafson created KAFKA-13167: --- Summary: KRaft broker should heartbeat immediately during controlled shutdown Key: KAFKA-13167 URL: https://issues.apache.org/jira/browse/KAFKA-13167 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson Controlled shutdown in KRaft is signaled through a heartbeat request with the `shouldShutDown` flag set to true. When we begin controlled shutdown, we should immediately schedule the next heartbeat instead of waiting for the next periodic heartbeat so that we can shutdown more quickly. Otherwise controlled shutdown can be delayed by several seconds. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] joel-hamill commented on pull request #11163: MINOR: doc change for minisr to clarify replicas in Kafka Config
joel-hamill commented on pull request #11163: URL: https://github.com/apache/kafka/pull/11163#issuecomment-892895687 LGTM, had one comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] joel-hamill commented on a change in pull request #11163: MINOR: doc change for minisr to clarify replicas in Kafka Config
joel-hamill commented on a change in pull request #11163: URL: https://github.com/apache/kafka/pull/11163#discussion_r682878756 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -826,10 +826,11 @@ object KafkaConfig { "a write for the write to be considered successful. If this minimum cannot be met, " + "then the producer will raise an exception (either NotEnoughReplicas or " + "NotEnoughReplicasAfterAppend).When used together, min.insync.replicas and acks " + -"allow you to enforce greater durability guarantees. A typical scenario would be to " + +"allow you to enforce greater durability guarantees. The leader and its followers are " + +"all considered replicas so a typical scenario would be to " + "create a topic with a replication factor of 3, set min.insync.replicas to 2, and " + "produce with acks of \"all\". This will ensure that the producer raises an exception " + -"if a majority of replicas do not receive a write." +"if a majority of replicas (2) do not receive a write." Review comment: ```suggestion "if a majority of replicas, in this case 2, do not receive a write." ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft
mumrah merged pull request #11166: URL: https://github.com/apache/kafka/pull/11166 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah edited a comment on pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft
mumrah edited a comment on pull request #11166: URL: https://github.com/apache/kafka/pull/11166#issuecomment-892891735 Spoke with @vvcephei offline about the streams smoke test and it is a known flaky. We are good-to-go with enabling KRaft for this test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft
mumrah commented on pull request #11166: URL: https://github.com/apache/kafka/pull/11166#issuecomment-892891735 Spoke with @vvcephei about the streams smoke test and it is a known flaky. We are good-to-go with enabling KRaft for this test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
guozhangwang commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r682871868 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ## @@ -172,7 +172,7 @@ public void removeNamedTopology(final String topologyToRemove) { */ public void cleanUpNamedTopology(final String name) { if (getTopologyByName(name).isPresent()) { -throw new IllegalStateException("Can't clean up local state for an active NamedTopology"); +throw new IllegalStateException("Can't clean up local state for an active NamedTopology: "); Review comment: The topology was not included? :) ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition topicPartition, final Lon } } +/** + * Checks for added or removed NamedTopologies that correspond to any assigned tasks, and creates/freezes them if so + */ +void handleTopologyUpdates() { +tasks.maybeCreateTasksFromNewTopologies(); +for (final Task task : activeTaskIterable()) { +if (topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) { +task.freezeProcessing(); Review comment: Not sure I understand this logic.. could you explain a bit? Also there's no unfrozen function so once a task is frozen it would stick with that state forever? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java ## @@ -84,6 +86,15 @@ void setMainConsumer(final Consumer mainConsumer) { this.mainConsumer = mainConsumer; } +void handleNewAssignmentAndCreateTasks(final Map> activeTasksToCreate, + final Map> standbyTasksToCreate, + final Set assignedActiveTasks, + final Set assignedStandbyTasks) { +activeTaskCreator.removeRevokedUnknownTasks(diff(HashSet::new, assignedActiveTasks, activeTasksToCreate.keySet())); Review comment: The diff between `assignedActiveTasks` and `activeTasksToCreate` are those tasks that are already owned and hence do not need to be assigned, am I reading that right (ditto for standbys)? Why these should be considered as remove revoked? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API
guozhangwang commented on a change in pull request #10788: URL: https://github.com/apache/kafka/pull/10788#discussion_r682861544 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java ## @@ -64,6 +65,9 @@ private final Map taskProducers; private final StreamThread.ProcessingMode processingMode; +// tasks may be assigned for a NamedTopology that is not yet known by this host, and saved for later creation +private final Map> unknownTasksToBeCreated = new HashMap<>(); Review comment: Yeah I left this comment before syncing with @ableegoldman offline, so it's better to clarify for others reading this PR :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13166) EOFException when Controller handles unknown API
David Arthur created KAFKA-13166: Summary: EOFException when Controller handles unknown API Key: KAFKA-13166 URL: https://issues.apache.org/jira/browse/KAFKA-13166 Project: Kafka Issue Type: Bug Reporter: David Arthur Assignee: David Arthur When ControllerApis handles an unsupported RPC, it silently drops the request due to an unhandled exception. The following stack trace was manually printed since this exception was suppressed on the controller. {code} java.util.NoSuchElementException: key not found: UpdateFeatures at scala.collection.MapOps.default(Map.scala:274) at scala.collection.MapOps.default$(Map.scala:273) at scala.collection.AbstractMap.default(Map.scala:405) at scala.collection.mutable.HashMap.apply(HashMap.scala:425) at kafka.network.RequestChannel$Metrics.apply(RequestChannel.scala:74) at kafka.network.RequestChannel.$anonfun$updateErrorMetrics$1(RequestChannel.scala:458) at kafka.network.RequestChannel.$anonfun$updateErrorMetrics$1$adapted(RequestChannel.scala:457) at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359) at scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355) at scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309) at kafka.network.RequestChannel.updateErrorMetrics(RequestChannel.scala:457) at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:388) at kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:93) at kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:121) at kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:78) at kafka.server.ControllerApis.handle(ControllerApis.scala:116) at kafka.server.ControllerApis.$anonfun$handleEnvelopeRequest$1(ControllerApis.scala:125) at kafka.server.ControllerApis.$anonfun$handleEnvelopeRequest$1$adapted(ControllerApis.scala:125) at kafka.server.EnvelopeUtils$.handleEnvelopeRequest(EnvelopeUtils.scala:65) at kafka.server.ControllerApis.handleEnvelopeRequest(ControllerApis.scala:125) at kafka.server.ControllerApis.handle(ControllerApis.scala:103) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) at java.lang.Thread.run(Thread.java:748) {code} This is due to a bug in the metrics code in RequestChannel. The result is that the request fails, but no indication is given that it was due to an unsupported API on either the broker, controller, or client. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rondagostino commented on a change in pull request #11175: MINOR: Fix getting started documentation
rondagostino commented on a change in pull request #11175: URL: https://github.com/apache/kafka/pull/11175#discussion_r682834005 ## File path: config/kraft/README.md ## @@ -14,8 +14,9 @@ Most important of all, KRaft mode is more scalable. We expect to be able to [su # Quickstart ## Warning -KRaft mode in Kafka 3.0 is provided for testing only, *NOT* for production. We do not yet support upgrading existing ZooKeeper-based Kafka clusters into this mode. In fact, when Kafka 3.1 is released, -it may not be possible to upgrade your KRaft clusters from 3.0 to 3.1. There may be bugs, including serious ones. You should *assume that your data could be lost at any time* if you try the preview release of KRaft mode. +KRaft mode in Kafka 3.0 is provided for testing only, *NOT* for production. We do not yet support upgrading existing ZooKeeper-based Kafka clusters into KRaft mode. +It is not possible to upgrade KRaft clusters from 2.8 to 3.0. Upgrading KRaft clusters from 3.0 to 3.1 will be supported. There may be bugs, including serious ones. Review comment: > Upgrading KRaft clusters from 3.0 to 3.1 will be supported. Is that guaranteed to be true? Would it be better to say something like "Upgrading KRaft clusters from 3.0 to 3.1 is likely to be supported, but this is not guaranteed." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr closed pull request #11159: MINOR: Change default node id in kraft broker properties
dielhennr closed pull request #11159: URL: https://github.com/apache/kafka/pull/11159 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig
junrao commented on a change in pull request #0: URL: https://github.com/apache/kafka/pull/0#discussion_r682820398 ## File path: core/src/main/scala/kafka/log/LogConfig.scala ## @@ -107,46 +107,52 @@ case class LogConfig(props: java.util.Map[_, _], overriddenConfigs: Set[String] val LeaderReplicationThrottledReplicas = getList(LogConfig.LeaderReplicationThrottledReplicasProp) val FollowerReplicationThrottledReplicas = getList(LogConfig.FollowerReplicationThrottledReplicasProp) val messageDownConversionEnable = getBoolean(LogConfig.MessageDownConversionEnableProp) - val remoteStorageEnable = getBoolean(LogConfig.RemoteLogStorageEnableProp) - val localRetentionMs: Long = { -val localLogRetentionMs = getLong(LogConfig.LocalLogRetentionMsProp) + class TieredLogConfig { Review comment: This probably should be named RemoteLogConfig to match RemoteLogManagerConfig? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dielhennr commented on pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.
dielhennr commented on pull request #11141: URL: https://github.com/apache/kafka/pull/11141#issuecomment-892837165 @hachikuji @rajinisivaram since this build is green I’m fairly confident that dynamic config behavior hasn’t changed in a zk cluster. I am wondering if it is better to have buggy validation than no validation for KRaft brokers (assuming this is still buggy). Do either of you anticipate any problems with doing this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11154: KAFKA-13068: Rename Log to UnifiedLog
junrao commented on a change in pull request #11154: URL: https://github.com/apache/kafka/pull/11154#discussion_r682801773 ## File path: core/src/main/scala/kafka/log/UnifiedLog.scala ## @@ -248,16 +250,16 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason { * will be deleted to avoid ID conflicts upon re-upgrade. */ @threadsafe -class Log(@volatile var logStartOffset: Long, - private val localLog: LocalLog, - brokerTopicStats: BrokerTopicStats, - val producerIdExpirationCheckIntervalMs: Int, - @volatile var leaderEpochCache: Option[LeaderEpochFileCache], - val producerStateManager: ProducerStateManager, - @volatile private var _topicId: Option[Uuid], - val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { +class UnifiedLog(@volatile var logStartOffset: Long, + private val localLog: LocalLog, + brokerTopicStats: BrokerTopicStats, + val producerIdExpirationCheckIntervalMs: Int, + @volatile var leaderEpochCache: Option[LeaderEpochFileCache], + val producerStateManager: ProducerStateManager, + @volatile private var _topicId: Option[Uuid], + val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup { - import kafka.log.Log._ + import kafka.log.UnifiedLog._ Review comment: Should we rename the logging prefix to UnifiedLog too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #11150: MINOR: Fix missing word in LogLoader logged warning
ijuma merged pull request #11150: URL: https://github.com/apache/kafka/pull/11150 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #11168: KAFKA-13160: Fix BrokerMetadataPublisher to pass the correct resource name to the config handler when processing config updates
cmccabe commented on pull request #11168: URL: https://github.com/apache/kafka/pull/11168#issuecomment-892828330 Thanks again for the PR. I left another comment. We need some kind of test for this. The simplest way to do it is probably to add an integration test for setting dynamic configs in `KRaftClusterTest.scala`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11168: KAFKA-13160: Fix BrokerMetadataPublisher to pass the correct resource name to the config handler when processing config updates
cmccabe commented on a change in pull request #11168: URL: https://github.com/apache/kafka/pull/11168#discussion_r682803659 ## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala ## @@ -203,7 +203,15 @@ class BrokerMetadataPublisher(conf: KafkaConfig, } tag.foreach { t => val newProperties = newImage.configs().configProperties(configResource) - dynamicConfigHandlers(t).processConfigChanges(configResource.name(), newProperties) +val maybeDefaultName = if (conf.usesSelfManagedQuorum) { Review comment: BrokerMetadataPublisher is only used when in KRaft mode, so this check is not necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11137: KAFKA-13133 Replace EasyMock and PowerMock with Mockito for AbstractHerderTest
cmccabe commented on a change in pull request #11137: URL: https://github.com/apache/kafka/pull/11137#discussion_r682796708 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java ## @@ -135,90 +133,88 @@ private final int generation = 5; private final String connector = "connector"; private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy(); +private Connector insConnector; -@MockStrict private Worker worker; -@MockStrict private WorkerConfigTransformer transformer; -@MockStrict private Plugins plugins; -@MockStrict private ClassLoader classLoader; -@MockStrict private ConfigBackingStore configStore; -@MockStrict private StatusBackingStore statusStore; +final private Worker worker = mock(Worker.class); +final private WorkerConfigTransformer transformer = mock(WorkerConfigTransformer.class); +final private Plugins plugins = mock(Plugins.class); +final private ClassLoader classLoader = mock(ClassLoader.class); +final private ConfigBackingStore configStore = mock(ConfigBackingStore.class); +final private StatusBackingStore statusStore = mock(StatusBackingStore.class); +private ClassLoader loader; + +@Before +public void before() { +loader = Utils.getContextOrKafkaClassLoader(); +} + +@After +public void tearDown() { +if (loader != null) Plugins.compareAndSwapLoaders(loader); +} @Test public void testConnectors() { -AbstractHerder herder = partialMockBuilder(AbstractHerder.class) -.withConstructor( -Worker.class, -String.class, -String.class, -StatusBackingStore.class, -ConfigBackingStore.class, -ConnectorClientConfigOverridePolicy.class -) -.withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy) -.addMockedMethod("generation") -.createMock(); - -EasyMock.expect(herder.generation()).andStubReturn(generation); -EasyMock.expect(herder.rawConfig(connector)).andReturn(null); -EasyMock.expect(configStore.snapshot()).andReturn(SNAPSHOT); -replayAll(); +//AbstractHerder herder = partialMockBuilder(AbstractHerder.class) Review comment: it seems like you intended to remove this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11137: KAFKA-13133 Replace EasyMock and PowerMock with Mockito for AbstractHerderTest
cmccabe commented on a change in pull request #11137: URL: https://github.com/apache/kafka/pull/11137#discussion_r682796708 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java ## @@ -135,90 +133,88 @@ private final int generation = 5; private final String connector = "connector"; private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy(); +private Connector insConnector; -@MockStrict private Worker worker; -@MockStrict private WorkerConfigTransformer transformer; -@MockStrict private Plugins plugins; -@MockStrict private ClassLoader classLoader; -@MockStrict private ConfigBackingStore configStore; -@MockStrict private StatusBackingStore statusStore; +final private Worker worker = mock(Worker.class); +final private WorkerConfigTransformer transformer = mock(WorkerConfigTransformer.class); +final private Plugins plugins = mock(Plugins.class); +final private ClassLoader classLoader = mock(ClassLoader.class); +final private ConfigBackingStore configStore = mock(ConfigBackingStore.class); +final private StatusBackingStore statusStore = mock(StatusBackingStore.class); +private ClassLoader loader; + +@Before +public void before() { +loader = Utils.getContextOrKafkaClassLoader(); +} + +@After +public void tearDown() { +if (loader != null) Plugins.compareAndSwapLoaders(loader); +} @Test public void testConnectors() { -AbstractHerder herder = partialMockBuilder(AbstractHerder.class) -.withConstructor( -Worker.class, -String.class, -String.class, -StatusBackingStore.class, -ConfigBackingStore.class, -ConnectorClientConfigOverridePolicy.class -) -.withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy) -.addMockedMethod("generation") -.createMock(); - -EasyMock.expect(herder.generation()).andStubReturn(generation); -EasyMock.expect(herder.rawConfig(connector)).andReturn(null); -EasyMock.expect(configStore.snapshot()).andReturn(SNAPSHOT); -replayAll(); +//AbstractHerder herder = partialMockBuilder(AbstractHerder.class) Review comment: why is this present but commented out? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #11130: KAFKA-13138: FileConfigProvider#get should keep failure exception
cmccabe merged pull request #11130: URL: https://github.com/apache/kafka/pull/11130 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11175: MINOR: Fix getting started documentation
cmccabe commented on a change in pull request #11175: URL: https://github.com/apache/kafka/pull/11175#discussion_r682789187 ## File path: config/kraft/broker.properties ## @@ -24,7 +24,7 @@ process.roles=broker # The node id associated with this instance's roles -node.id=1 +node.id=2 Review comment: This isn't strictly necessary because the id space for brokers and controllers is distinct. I guess it's fine to do this though, to avoid confusion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #11175: MINOR: Fix getting started documentation
cmccabe commented on a change in pull request #11175: URL: https://github.com/apache/kafka/pull/11175#discussion_r682788755 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1725,7 +1725,14 @@ private long maybeSendRequest( return Long.MAX_VALUE; } -return connection.remainingRequestTimeMs(currentTimeMs); +long connectionWaitTimeMs = connection.remainingRequestTimeMs(currentTimeMs); +logger.trace( Review comment: can we use less vertical space here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request #11175: MINOR: Fix getting started documentation
jsancio opened a new pull request #11175: URL: https://github.com/apache/kafka/pull/11175 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13165) Validate node id, process role and quorum voters
[ https://issues.apache.org/jira/browse/KAFKA-13165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393313#comment-17393313 ] Colin McCabe commented on KAFKA-13165: -- The node ID space of controllers and brokers is disjoint. So we cannot find out anything useful by checking if the broker's node.id is in controller.quorum.voters. Brokers should be observers in the raft quorum, so I don't see how the broker becoming a leader could be a problem. Maybe this is a loophole we need to close. We should be able to tell the RaftClient whether it is an observer or a voter. > Validate node id, process role and quorum voters > > > Key: KAFKA-13165 > URL: https://issues.apache.org/jira/browse/KAFKA-13165 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: Jose Armando Garcia Sancio >Priority: Major > > Under certain configuration is possible for the Kafka Server to boot up as a > broker only but be the cluster metadata quorum leader. We should validate the > configuration to avoid this case. > # If the {{process.roles}} contains {{controller}} then the {{node.id}} > needs to be in the {{controller.quorum.voters}} > # If the {{process.roles}} doesn't contain {{controller}} then the > {{node.id}} cannot be in the {{controller.quorum.voters}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-13165) Validate node id, process role and quorum voters
[ https://issues.apache.org/jira/browse/KAFKA-13165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393313#comment-17393313 ] Colin McCabe edited comment on KAFKA-13165 at 8/4/21, 4:38 PM: --- The node ID space of controllers and brokers is disjoint. So we cannot find out anything useful by checking if the broker's node.id is in controller.quorum.voters. Brokers should be observers in the raft quorum, so I don't see how the broker becoming a leader could possibly happen. Maybe this is a loophole we need to close. We should be able to tell the RaftClient whether it is an observer or a voter. was (Author: cmccabe): The node ID space of controllers and brokers is disjoint. So we cannot find out anything useful by checking if the broker's node.id is in controller.quorum.voters. Brokers should be observers in the raft quorum, so I don't see how the broker becoming a leader could be a problem. Maybe this is a loophole we need to close. We should be able to tell the RaftClient whether it is an observer or a voter. > Validate node id, process role and quorum voters > > > Key: KAFKA-13165 > URL: https://issues.apache.org/jira/browse/KAFKA-13165 > Project: Kafka > Issue Type: Sub-task > Components: kraft >Reporter: Jose Armando Garcia Sancio >Priority: Major > > Under certain configuration is possible for the Kafka Server to boot up as a > broker only but be the cluster metadata quorum leader. We should validate the > configuration to avoid this case. > # If the {{process.roles}} contains {{controller}} then the {{node.id}} > needs to be in the {{controller.quorum.voters}} > # If the {{process.roles}} doesn't contain {{controller}} then the > {{node.id}} cannot be in the {{controller.quorum.voters}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jsancio commented on a change in pull request #11159: MINOR: Change default node id in kraft broker properties
jsancio commented on a change in pull request #11159: URL: https://github.com/apache/kafka/pull/11159#discussion_r682775395 ## File path: core/src/main/scala/kafka/server/ControllerApis.scala ## @@ -552,7 +552,10 @@ class ControllerApis(val requestChannel: RequestChannel, def handleBrokerRegistration(request: RequestChannel.Request): Unit = { val registrationRequest = request.body[BrokerRegistrationRequest] authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) - +val controllerNode = controllerNodes.find(node => node.id == registrationRequest.brokerId) +if (controllerNode.isDefined) { + info(s"A broker is registering with ${KafkaConfig.NodeIdProp}=${controllerNode.get.id} which is also in use by a controller.") +} Review comment: I would remove this log message. This is a valid configuration and I don't think it is providing any additional information. ## File path: clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java ## @@ -57,6 +57,10 @@ public BrokerRegistrationRequestData data() { return data; } +public int brokerId() { +return data.brokerId(); +} + Review comment: I think this pattern undermines the advantages of the message JSON schema and the Java code generated from it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah edited a comment on pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft
mumrah edited a comment on pull request #11166: URL: https://github.com/apache/kafka/pull/11166#issuecomment-892069170 Streams smoke test runs: * https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4630/ * https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4631/ * https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4636/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)
jolshan commented on a change in pull request #11171: URL: https://github.com/apache/kafka/pull/11171#discussion_r682739288 ## File path: core/src/main/scala/kafka/cluster/Partition.scala ## @@ -313,14 +313,20 @@ class Partition(val topicPartition: TopicPartition, } def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = { -isFutureReplica match { - case true if futureLog.isEmpty => -val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId) +val logOpt = if (isFutureReplica) futureLog else log +if (logOpt.isEmpty) { + val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId) + if (isFutureReplica) this.futureLog = Option(log) - case false if log.isEmpty => -val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId) + else this.log = Option(log) - case _ => trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.") +} else { + trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.") + logOpt.foreach { log => +if (log.topicId.isEmpty) { Review comment: I'm a little unsure what you mean by topicId is consistent here. (There would have been no log/topic ID in log to compare to). We can add an extra check in assign topic ID to ensure there is no way to assign inconsistently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13165) Validate node id, process role and quorum voters
Jose Armando Garcia Sancio created KAFKA-13165: -- Summary: Validate node id, process role and quorum voters Key: KAFKA-13165 URL: https://issues.apache.org/jira/browse/KAFKA-13165 Project: Kafka Issue Type: Sub-task Components: kraft Reporter: Jose Armando Garcia Sancio Under certain configuration is possible for the Kafka Server to boot up as a broker only but be the cluster metadata quorum leader. We should validate the configuration to avoid this case. # If the {{process.roles}} contains {{controller}} then the {{node.id}} needs to be in the {{controller.quorum.voters}} # If the {{process.roles}} doesn't contain {{controller}} then the {{node.id}} cannot be in the {{controller.quorum.voters}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yazgoo updated KAFKA-10413: --- Description: GHi, With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, if a connect instance disappear, or a new one appear, we're seeing unbalanced consumption, much like mentionned in this post: [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors] This usually leads to one kafka connect instance taking most of the load and consumption not being able to keep on. Currently, we're "fixing" this by deleting the connector and re-creating it, but this is far from ideal. Any suggestion on what we could do to mitigate this ? was: Hi, With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, if a connect instance disappear, or a new one appear, we're seeing unbalanced consumption, much like mentionned in this post: [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors] This usually leads to one kafka connect instance taking most of the load and consumption not being able to keep on. Currently, we're "fixing" this by deleting the connector and re-creating it, but this is far from ideal. Any suggestion on what we could do to mitigate this ? > rebalancing leads to unevenly balanced connectors > - > > Key: KAFKA-10413 > URL: https://issues.apache.org/jira/browse/KAFKA-10413 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1 >Reporter: yazgoo >Assignee: rameshkrishnan muthusamy >Priority: Major > Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2 > > Attachments: connect_worker_balanced.png > > > GHi, > With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, > if a connect instance disappear, or a new one appear, we're seeing unbalanced > consumption, much like mentionned in this post: > [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors] > This usually leads to one kafka connect instance taking most of the load and > consumption not being able to keep on. > Currently, we're "fixing" this by deleting the connector and re-creating it, > but this is far from ideal. > Any suggestion on what we could do to mitigate this ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-10413) rebalancing leads to unevenly balanced connectors
[ https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yazgoo reopened KAFKA-10413: Hi, I'm reopening this because I'm still seeing this issue with CP 6.2.0 which ships with 2.8.0 which is marked as a fixed version. We basically see the same behavior as mentioned in the issue description. > rebalancing leads to unevenly balanced connectors > - > > Key: KAFKA-10413 > URL: https://issues.apache.org/jira/browse/KAFKA-10413 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1 >Reporter: yazgoo >Assignee: rameshkrishnan muthusamy >Priority: Major > Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2 > > Attachments: connect_worker_balanced.png > > > Hi, > With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, > if a connect instance disappear, or a new one appear, we're seeing unbalanced > consumption, much like mentionned in this post: > [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors] > This usually leads to one kafka connect instance taking most of the load and > consumption not being able to keep on. > Currently, we're "fixing" this by deleting the connector and re-creating it, > but this is far from ideal. > Any suggestion on what we could do to mitigate this ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei edited a comment on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
vvcephei edited a comment on pull request #10602: URL: https://github.com/apache/kafka/pull/10602#issuecomment-892354449 I went ahead and kicked off a full system test build to evaluate the other tests that got changed in this PR: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4633/console EDIT: Results: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-08-04--001.system-test-kafka-branch-builder--1628079409--kamalcph--KAFKA-12724--fbdce206b/report.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei edited a comment on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
vvcephei edited a comment on pull request #10602: URL: https://github.com/apache/kafka/pull/10602#issuecomment-892707889 There were a number of failures in the prior run. Some were timeouts, and some of the Streams failures were due to the HA assignor giving only standbys to some nodes. I spot-checked by re-running a few of the tests locally and confirmed that they were flaky failures. I re-ran the tests (https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4635/) * with "spot instance" disabled * having added a change to the Streams tests intended to disable the HA assignor logic in favor of a one-shot balanced assignment. I'll keep an eye on that test run to make sure it's clean. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
vvcephei commented on pull request #10602: URL: https://github.com/apache/kafka/pull/10602#issuecomment-892707889 There were a number of failures in the prior run. Some were timeouts, and some of the Streams failures were due to the HA assignor giving only standbys to some nodes. I re-ran the tests (https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4635/) * with "spot instance" disabled * having added a change to the Streams tests intended to disable the HA assignor logic in favor of a one-shot balanced assignment. I'll keep an eye on that test run to make sure it's clean. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft
mumrah commented on a change in pull request #11166: URL: https://github.com/apache/kafka/pull/11166#discussion_r682654917 ## File path: tests/kafkatest/tests/core/replication_test.py ## @@ -122,14 +122,16 @@ def min_cluster_size(self): @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["leader"], security_protocol=["PLAINTEXT"], -enable_idempotence=[True]) +enable_idempotence=[True], +metadata_quorum=quorum.all_non_upgrade) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["leader"], security_protocol=["PLAINTEXT", "SASL_SSL"], metadata_quorum=quorum.all_non_upgrade) @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], broker_type=["controller"], -security_protocol=["PLAINTEXT", "SASL_SSL"]) +security_protocol=["PLAINTEXT", "SASL_SSL"], +metadata_quorum=quorum.all_non_upgrade) Review comment: Good catch @rondagostino. Looks like this was here as a pre-check to make sure the matrix args were correct. I'll fix this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah edited a comment on pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft
mumrah edited a comment on pull request #11166: URL: https://github.com/apache/kafka/pull/11166#issuecomment-892069170 Streams smoke test runs: * https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4630/ * https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4631/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah edited a comment on pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft
mumrah edited a comment on pull request #11166: URL: https://github.com/apache/kafka/pull/11166#issuecomment-892067696 Replication test runs: * https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4627/ * https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4634/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
vvcephei commented on pull request #10602: URL: https://github.com/apache/kafka/pull/10602#issuecomment-892677574 Thanks @kamalcph . Sorry it took me so long to look into it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.
vvcephei commented on a change in pull request #10602: URL: https://github.com/apache/kafka/pull/10602#discussion_r682637061 ## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ## @@ -25,15 +25,17 @@ from kafkatest.services.zookeeper import ZookeeperService from kafkatest.tests.streams.utils import extract_generation_from_logs, extract_generation_id from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ -LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, DEV_BRANCH, DEV_VERSION, KafkaVersion +LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, DEV_BRANCH, DEV_VERSION, KafkaVersion # broker 0.10.0 is not compatible with newer Kafka Streams versions broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), \ str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), \ - str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(DEV_BRANCH)] + str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), str(DEV_BRANCH)] metadata_1_versions = [str(LATEST_0_10_0)] metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] +metadata_3_10_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), Review comment: It's probably a typo, but it didn't matter, since this was the wrong test to add new versions to, anyway. I backed out these changes and added the missing versions to streams_application_upgrade_test.py. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] axrj commented on pull request #8906: KAFKA-10190: To set replication throttling configs at broker entity-default
axrj commented on pull request #8906: URL: https://github.com/apache/kafka/pull/8906#issuecomment-892663172 Hey, Is there any plan to merge this? It would be great to have this patch backported too if possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akatona84 commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely
akatona84 commented on pull request #11174: URL: https://github.com/apache/kafka/pull/11174#issuecomment-892638494 I'm not sure who puts these lines in the final commit message, but pls make sure that put Daniel in it too: ``` Co-authored-by: Daniel Urban ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology
Ralph Matthias Debusmann created KAFKA-13164: Summary: State store is attached to wrong node in the Kafka Streams topology Key: KAFKA-13164 URL: https://issues.apache.org/jira/browse/KAFKA-13164 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.8.0 Environment: local development (MacOS Big Sur 11.4) Reporter: Ralph Matthias Debusmann Attachments: 1.jpg, 3.jpg Hi, mjsax and me noticed a bug where a state store is attached to the wrong node in the Kafka Streams topology. The issue arised when I tried to read a topic into a KTable, then continued with a mapValues(), and then joined this KTable with a KStream, like so: var kTable = this.streamsBuilder.table().mapValues(); and then later: var joinedKStream = kstream.leftJoin(kTable, ); The join didn't work, and neither did it work when I added Materialized.as() to mapValues(), like so: var kTable = this.streamsBuilder.table().mapValues(, *Materialized.as()*); Interestingly, I could get the join to work, when I first read the topic into a *KStream*, then continued with the mapValues(), then turned the KStream into a KTable, and then joined the KTable with the other KStream, like so: var kTable = this.streamsBuilder.stream().mapValues().toTable(); (the join worked the same as above) When mjsax and me had a look on the topology, we could see that in the former, not working code, the state store (required for the join) is attached to the pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node (see attachment "1.jpg"). In the working code, the state store is (correctly) attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg"). Best regards, xdgrulez -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] akatona84 commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely
akatona84 commented on pull request #11174: URL: https://github.com/apache/kafka/pull/11174#issuecomment-892623567 @kkonstantine , @rhauch could you take a look on this minor fix with big impact? Thx! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10719) MirrorMaker2 fails to update its runtime configuration
[ https://issues.apache.org/jira/browse/KAFKA-10719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393104#comment-17393104 ] Daniel Urban commented on KAFKA-10719: -- I believe that the core issue here is that the connectors will start up with the old config first, and then the new config gets applied by MM2. If the old listener is not available, that will cause timeouts in the rebalance. Connectors have a default timeout which exceeds the default rebalance timeout. When the rebalance timeout happens, none of the MM2 nodes will be able to update the config after the startup. Besides deleting the config topic, you can also try to increase the rebalance timeout of the Connect workers running inside MM2 (to something around 3 minutes), which will allow the cluster to not hit a rebalance timeout, and then successfully apply the new configs. > MirrorMaker2 fails to update its runtime configuration > -- > > Key: KAFKA-10719 > URL: https://issues.apache.org/jira/browse/KAFKA-10719 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.6.0 >Reporter: Peter Sinoros-Szabo >Priority: Major > > I was running successfully the MM2 cluster with the following configuration, > I simplified it a little: {code:java} clusters = main, backup > main.bootstrap.servers = kafkaA:9202,kafkaB:9092,kafkaB:9092 > backup.bootstrap.servers = backupA:9092,backupB:9092,backupC:9092 > main->backup.enabled = true main->backup.topics = .*{code} I wanted to change > the bootstrap.address list of the destination cluster to a different list > that is pointing to the *same* cluster, just a different listener with a > different routing. So I changed it to: {code:java} backup.bootstrap.servers = > backupSecA:1234,backupSecB:1234,backupSecC:1234{code} I did a rolling restart > on the MM2 nodes and say that some tasks were still using the old bootstrap > addresses, some of them were using the new one. I don't have the logs, so > unfortunately I don't know which one picked up the good values and which > didn't. I even stopped the cluster completely, but it didn't help. Ryanne > adviced to delete the mm2-config and mm2-status topics, so I did delete those > on the destination cluster, that solved this issue. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.
[ https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393101#comment-17393101 ] Daniel Urban commented on KAFKA-9981: - This KIP aims to fix the issue by adding the REST API to MM2, and also improving the config provider reference handling in the MM2 configs: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters] > Running a dedicated mm2 cluster with more than one nodes,When the > configuration is updated the task is not aware and will lose the update > operation. > > > Key: KAFKA-9981 > URL: https://issues.apache.org/jira/browse/KAFKA-9981 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.4.0, 2.5.0, 2.4.1 >Reporter: victor >Priority: Major > > DistributedHerder.reconfigureConnector induction config update as follows: > {code:java} > if (changed) { > List> rawTaskProps = reverseTransform(connName, > configState, taskProps); > if (isLeader()) { > configBackingStore.putTaskConfigs(connName, rawTaskProps); > cb.onCompletion(null, null); > } else { > // We cannot forward the request on the same thread because this > reconfiguration can happen as a result of connector > // addition or removal. If we blocked waiting for the response from > leader, we may be kicked out of the worker group. > forwardRequestExecutor.submit(new Runnable() { > @Override > public void run() { > try { > String leaderUrl = leaderUrl(); > if (leaderUrl == null || leaderUrl.trim().isEmpty()) { > cb.onCompletion(new ConnectException("Request to > leader to " + > "reconfigure connector tasks failed " + > "because the URL of the leader's REST > interface is empty!"), null); > return; > } > String reconfigUrl = RestServer.urlJoin(leaderUrl, > "/connectors/" + connName + "/tasks"); > log.trace("Forwarding task configurations for connector > {} to leader", connName); > RestClient.httpRequest(reconfigUrl, "POST", null, > rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm); > cb.onCompletion(null, null); > } catch (ConnectException e) { > log.error("Request to leader to reconfigure connector > tasks failed", e); > cb.onCompletion(e, null); > } > } > }); > } > } > {code} > KafkaConfigBackingStore task checks for configuration updates,such as topic > whitelist update.If KafkaConfigBackingStore task is not running on leader > node,an HTTP request will be send to notify the leader of the configuration > update.However,dedicated mm2 cluster does not have the HTTP server turned > on,so the request will fail to be sent,causing the update operation to be > lost. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9805) Running MirrorMaker in a Connect cluster,but the task not running
[ https://issues.apache.org/jira/browse/KAFKA-9805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andras Katona resolved KAFKA-9805. -- Resolution: Duplicate > Running MirrorMaker in a Connect cluster,but the task not running > - > > Key: KAFKA-9805 > URL: https://issues.apache.org/jira/browse/KAFKA-9805 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.1 > Environment: linux >Reporter: ZHAO GH >Priority: Major > Original Estimate: 5h > Remaining Estimate: 5h > > when i am Running MirrorMaker in a Connect clusterwhen i am Running > MirrorMaker in a Connect cluster,sometime the task running,but sometime the > task cannot assignment。 > I post connector config to connect cluster,here is my config > http://99.12.98.33:8083/connectorshttp://99.12.98.33:8083/connectors > { "name": "kafka->kafka241-3", > "config": { > "connector.class": > "org.apache.kafka.connect.mirror.MirrorSourceConnector", > "topics": "MM2-3", > "key.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter", > "tasks.max": 8, > "sasl.mechanism": "PLAIN", > "security.protocol": "SASL_PLAINTEXT", > "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule > required username=\"admin\" password=\"admin\";", > "source.cluster.alias": "kafka", "source.cluster.bootstrap.servers": > "55.13.104.70:9092,55.13.104.74:9092,55.13.104.126:9092", > "source.admin.sasl.mechanism": "PLAIN", > "source.admin.security.protocol": "SASL_PLAINTEXT", > "source.admin.sasl.jaas.config": > "org.apache.kafka.common.security.plain.PlainLoginModule required > username=\"admin\" password=\"admin\";", > "target.cluster.alias": "kafka241", > "target.cluster.bootstrap.servers": > "55.14.111.22:9092,55.14.111.23:9092,55.14.111.25:9092", > "target.admin.sasl.mechanism": "PLAIN", > "target.admin.security.protocol": "SASL_PLAINTEXT", > "target.admin.sasl.jaas.config": > "org.apache.kafka.common.security.plain.PlainLoginModule required > username=\"admin\" password=\"admin\";", > "producer.sasl.mechanism": "PLAIN", > "producer.security.protocol": "SASL_PLAINTEXT", > "producer.sasl.jaas.config": > "org.apache.kafka.common.security.plain.PlainLoginModule required > username=\"admin\" password=\"admin\";", > "consumer.sasl.mechanism": "PLAIN", > "consumer.security.protocol": "SASL_PLAINTEXT", > "consumer.sasl.jaas.config": > "org.apache.kafka.common.security.plain.PlainLoginModule required > username=\"admin\" password=\"admin\";", > "consumer.group.id": "mm2-1" > } > } > > but I get the connector status,found not tasks running > http://99.12.98.33:8083/connectors/kafka->kafka241-3/status > { > "name": "kafka->kafka241-3", > "connector": { > "state": "RUNNING", > "worker_id": "99.12.98.34:8083" > }, > "tasks": [], > "type": "source" > } > > but sometime,the task run success > http://99.12.98.33:8083/connectors/kafka->kafka241-1/status > { > "name": "kafka->kafka241-1", > "connector": { > "state": "RUNNING", > "worker_id": "99.12.98.34:8083" > }, > "tasks": [ > { > "id": 0, > "state": "RUNNING", > "worker_id": "99.12.98.34:8083" > }, > { > "id": 1, > "state": "RUNNING", > "worker_id": "99.12.98.33:8083" > }, > { > "id": 2, > "state": "RUNNING", > "worker_id": "99.12.98.34:8083" > } > ], > "type": "source" > } > is somebody met this problem? how to fix it,is it a bug? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] akatona84 commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely
akatona84 commented on pull request #11174: URL: https://github.com/apache/kafka/pull/11174#issuecomment-892618088 Thanks @urbandan for investigating this issue with me and creating the fix. Could you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] akatona84 opened a new pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely
akatona84 opened a new pull request #11174: URL: https://github.com/apache/kafka/pull/11174 * URL wasn't urlencoded when forwarded reconfiguration to leader connect worker * handling previously swallowed errors in connect RestClient *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9805) Running MirrorMaker in a Connect cluster,but the task not running
[ https://issues.apache.org/jira/browse/KAFKA-9805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393080#comment-17393080 ] Daniel Urban commented on KAFKA-9805: - I believe this is the same issue as KAFKA-9747, see the answer from [~akatona] about the issue. > Running MirrorMaker in a Connect cluster,but the task not running > - > > Key: KAFKA-9805 > URL: https://issues.apache.org/jira/browse/KAFKA-9805 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.1 > Environment: linux >Reporter: ZHAO GH >Priority: Major > Original Estimate: 5h > Remaining Estimate: 5h > > when i am Running MirrorMaker in a Connect clusterwhen i am Running > MirrorMaker in a Connect cluster,sometime the task running,but sometime the > task cannot assignment。 > I post connector config to connect cluster,here is my config > http://99.12.98.33:8083/connectorshttp://99.12.98.33:8083/connectors > { "name": "kafka->kafka241-3", > "config": { > "connector.class": > "org.apache.kafka.connect.mirror.MirrorSourceConnector", > "topics": "MM2-3", > "key.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter", > "value.converter": > "org.apache.kafka.connect.converters.ByteArrayConverter", > "tasks.max": 8, > "sasl.mechanism": "PLAIN", > "security.protocol": "SASL_PLAINTEXT", > "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule > required username=\"admin\" password=\"admin\";", > "source.cluster.alias": "kafka", "source.cluster.bootstrap.servers": > "55.13.104.70:9092,55.13.104.74:9092,55.13.104.126:9092", > "source.admin.sasl.mechanism": "PLAIN", > "source.admin.security.protocol": "SASL_PLAINTEXT", > "source.admin.sasl.jaas.config": > "org.apache.kafka.common.security.plain.PlainLoginModule required > username=\"admin\" password=\"admin\";", > "target.cluster.alias": "kafka241", > "target.cluster.bootstrap.servers": > "55.14.111.22:9092,55.14.111.23:9092,55.14.111.25:9092", > "target.admin.sasl.mechanism": "PLAIN", > "target.admin.security.protocol": "SASL_PLAINTEXT", > "target.admin.sasl.jaas.config": > "org.apache.kafka.common.security.plain.PlainLoginModule required > username=\"admin\" password=\"admin\";", > "producer.sasl.mechanism": "PLAIN", > "producer.security.protocol": "SASL_PLAINTEXT", > "producer.sasl.jaas.config": > "org.apache.kafka.common.security.plain.PlainLoginModule required > username=\"admin\" password=\"admin\";", > "consumer.sasl.mechanism": "PLAIN", > "consumer.security.protocol": "SASL_PLAINTEXT", > "consumer.sasl.jaas.config": > "org.apache.kafka.common.security.plain.PlainLoginModule required > username=\"admin\" password=\"admin\";", > "consumer.group.id": "mm2-1" > } > } > > but I get the connector status,found not tasks running > http://99.12.98.33:8083/connectors/kafka->kafka241-3/status > { > "name": "kafka->kafka241-3", > "connector": { > "state": "RUNNING", > "worker_id": "99.12.98.34:8083" > }, > "tasks": [], > "type": "source" > } > > but sometime,the task run success > http://99.12.98.33:8083/connectors/kafka->kafka241-1/status > { > "name": "kafka->kafka241-1", > "connector": { > "state": "RUNNING", > "worker_id": "99.12.98.34:8083" > }, > "tasks": [ > { > "id": 0, > "state": "RUNNING", > "worker_id": "99.12.98.34:8083" > }, > { > "id": 1, > "state": "RUNNING", > "worker_id": "99.12.98.33:8083" > }, > { > "id": 2, > "state": "RUNNING", > "worker_id": "99.12.98.34:8083" > } > ], > "type": "source" > } > is somebody met this problem? how to fix it,is it a bug? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9747) No tasks created for a connector
[ https://issues.apache.org/jira/browse/KAFKA-9747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17393078#comment-17393078 ] Andras Katona commented on KAFKA-9747: -- The connect name contains a character which is considered as illegal char via HttpClient::newRequest {noformat} java.lang.IllegalArgumentException: Illegal character in path at index .. at java.net.URI.create(URI.java:852) at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:453) ... Caused by: java.net.URISyntaxException: Illegal character in path at index ... at java.net.URI$Parser.fail(URI.java:2848) at java.net.URI$Parser.checkChars(URI.java:3021) {noformat} > No tasks created for a connector > > > Key: KAFKA-9747 > URL: https://issues.apache.org/jira/browse/KAFKA-9747 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0 > Environment: OS: Ubuntu 18.04 LTS > Platform: Confluent Platform 5.4 > HW: The same behaviour on various AWS instances - from t3.small to c5.xlarge >Reporter: Vit Koma >Assignee: Andras Katona >Priority: Major > Attachments: connect-distributed.properties, connect.log > > > We are running Kafka Connect in a distributed mode on 3 nodes using Debezium > (MongoDB) and Confluent S3 connectors. When adding a new connector via the > REST API the connector is created in RUNNING state, but no tasks are created > for the connector. > Pausing and resuming the connector does not help. When we stop all workers > and then start them again, the tasks are created and everything runs as it > should. > The issue does not show up if we run only a single node. > The issue is not caused by the connector plugins, because we see the same > behaviour for both Debezium and S3 connectors. Also in debug logs I can see > that Debezium is correctly returning a task configuration from the > Connector.taskConfigs() method. > Connector configuration examples > Debezium: > {code} > { > "name": "qa-mongodb-comp-converter-task|1", > "config": { > "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", > "mongodb.hosts": > "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017", > "mongodb.name": "qa-debezium-comp", > "mongodb.ssl.enabled": true, > "collection.whitelist": "converter[.]task", > "tombstones.on.delete": true > } > } > {code} > S3 Connector: > {code} > { > "name": "qa-s3-sink-task|1", > "config": { > "connector.class": "io.confluent.connect.s3.S3SinkConnector", > "topics": "qa-debezium-comp.converter.task", > "topics.dir": "data/env/qa", > "s3.region": "eu-west-1", > "s3.bucket.name": "", > "flush.size": "15000", > "rotate.interval.ms": "360", > "storage.class": "io.confluent.connect.s3.storage.S3Storage", > "format.class": > "custom.kafka.connect.s3.format.plaintext.PlaintextFormat", > "schema.generator.class": > "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", > "partitioner.class": > "io.confluent.connect.storage.partitioner.DefaultPartitioner", > "schema.compatibility": "NONE", > "key.converter": "org.apache.kafka.connect.json.JsonConverter", > "value.converter": "org.apache.kafka.connect.json.JsonConverter", > "key.converter.schemas.enable": false, > "value.converter.schemas.enable": false, > "transforms": "ExtractDocument", > > "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value" > } > } > {code} > The connectors are created using curl: {{curl -X POST -H "Content-Type: > application/json" --data @ http:/:10083/connectors}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9747) No tasks created for a connector
[ https://issues.apache.org/jira/browse/KAFKA-9747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andras Katona reassigned KAFKA-9747: Assignee: Andras Katona > No tasks created for a connector > > > Key: KAFKA-9747 > URL: https://issues.apache.org/jira/browse/KAFKA-9747 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0 > Environment: OS: Ubuntu 18.04 LTS > Platform: Confluent Platform 5.4 > HW: The same behaviour on various AWS instances - from t3.small to c5.xlarge >Reporter: Vit Koma >Assignee: Andras Katona >Priority: Major > Attachments: connect-distributed.properties, connect.log > > > We are running Kafka Connect in a distributed mode on 3 nodes using Debezium > (MongoDB) and Confluent S3 connectors. When adding a new connector via the > REST API the connector is created in RUNNING state, but no tasks are created > for the connector. > Pausing and resuming the connector does not help. When we stop all workers > and then start them again, the tasks are created and everything runs as it > should. > The issue does not show up if we run only a single node. > The issue is not caused by the connector plugins, because we see the same > behaviour for both Debezium and S3 connectors. Also in debug logs I can see > that Debezium is correctly returning a task configuration from the > Connector.taskConfigs() method. > Connector configuration examples > Debezium: > {code} > { > "name": "qa-mongodb-comp-converter-task|1", > "config": { > "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", > "mongodb.hosts": > "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017", > "mongodb.name": "qa-debezium-comp", > "mongodb.ssl.enabled": true, > "collection.whitelist": "converter[.]task", > "tombstones.on.delete": true > } > } > {code} > S3 Connector: > {code} > { > "name": "qa-s3-sink-task|1", > "config": { > "connector.class": "io.confluent.connect.s3.S3SinkConnector", > "topics": "qa-debezium-comp.converter.task", > "topics.dir": "data/env/qa", > "s3.region": "eu-west-1", > "s3.bucket.name": "", > "flush.size": "15000", > "rotate.interval.ms": "360", > "storage.class": "io.confluent.connect.s3.storage.S3Storage", > "format.class": > "custom.kafka.connect.s3.format.plaintext.PlaintextFormat", > "schema.generator.class": > "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", > "partitioner.class": > "io.confluent.connect.storage.partitioner.DefaultPartitioner", > "schema.compatibility": "NONE", > "key.converter": "org.apache.kafka.connect.json.JsonConverter", > "value.converter": "org.apache.kafka.connect.json.JsonConverter", > "key.converter.schemas.enable": false, > "value.converter.schemas.enable": false, > "transforms": "ExtractDocument", > > "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value" > } > } > {code} > The connectors are created using curl: {{curl -X POST -H "Content-Type: > application/json" --data @ http:/:10083/connectors}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9747) No tasks created for a connector
[ https://issues.apache.org/jira/browse/KAFKA-9747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andras Katona updated KAFKA-9747: - Description: We are running Kafka Connect in a distributed mode on 3 nodes using Debezium (MongoDB) and Confluent S3 connectors. When adding a new connector via the REST API the connector is created in RUNNING state, but no tasks are created for the connector. Pausing and resuming the connector does not help. When we stop all workers and then start them again, the tasks are created and everything runs as it should. The issue does not show up if we run only a single node. The issue is not caused by the connector plugins, because we see the same behaviour for both Debezium and S3 connectors. Also in debug logs I can see that Debezium is correctly returning a task configuration from the Connector.taskConfigs() method. Connector configuration examples Debezium: {code} { "name": "qa-mongodb-comp-converter-task|1", "config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "mongodb.hosts": "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017", "mongodb.name": "qa-debezium-comp", "mongodb.ssl.enabled": true, "collection.whitelist": "converter[.]task", "tombstones.on.delete": true } } {code} S3 Connector: {code} { "name": "qa-s3-sink-task|1", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "topics": "qa-debezium-comp.converter.task", "topics.dir": "data/env/qa", "s3.region": "eu-west-1", "s3.bucket.name": "", "flush.size": "15000", "rotate.interval.ms": "360", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "custom.kafka.connect.s3.format.plaintext.PlaintextFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner", "schema.compatibility": "NONE", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": false, "value.converter.schemas.enable": false, "transforms": "ExtractDocument", "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value" } } {code} The connectors are created using curl: {{curl -X POST -H "Content-Type: application/json" --data @ http:/:10083/connectors}} was: We are running Kafka Connect in a distributed mode on 3 nodes using Debezium (MongoDB) and Confluent S3 connectors. When adding a new connector via the REST API the connector is created in RUNNING state, but no tasks are created for the connector. Pausing and resuming the connector does not help. When we stop all workers and then start them again, the tasks are created and everything runs as it should. The issue does not show up if we run only a single node. The issue is not caused by the connector plugins, because we see the same behaviour for both Debezium and S3 connectors. Also in debug logs I can see that Debezium is correctly returning a task configuration from the Connector.taskConfigs() method. Connector configuration examples Debezium: { "name": "qa-mongodb-comp-converter-task|1", "config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "mongodb.hosts": "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017", "mongodb.name": "qa-debezium-comp", "mongodb.ssl.enabled": true, "collection.whitelist": "converter[.]task", "tombstones.on.delete": true } } S3 Connector: { "name": "qa-s3-sink-task|1", "config": { "connector.class": "io.confluent.connect.s3.S3SinkConnector", "topics": "qa-debezium-comp.converter.task", "topics.dir": "data/env/qa", "s3.region": "eu-west-1", "s3.bucket.name": "", "flush.size": "15000", "rotate.interval.ms": "360", "storage.class": "io.confluent.connect.s3.storage.S3Storage", "format.class": "custom.kafka.connect.s3.format.plaintext.PlaintextFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner", "schema.compatibility": "NONE", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": false, "value.converter.schemas.enable": false, "transforms": "ExtractDocument", "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value" } } The connectors are created using curl: {{curl -X POST -H "Content-Type: application/json" --data @ http:/:10083/connectors}} > No tasks created for a connector > -
[jira] [Created] (KAFKA-13163) Issue with MySql worker sink connector
Muddam Pullaiah Yadav created KAFKA-13163: - Summary: Issue with MySql worker sink connector Key: KAFKA-13163 URL: https://issues.apache.org/jira/browse/KAFKA-13163 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.1.1 Environment: PreProd Reporter: Muddam Pullaiah Yadav Please help with the following issue. Really appreciate it! We are using Azure HDInsight Kafka cluster My sink Properties: cat mysql-sink-connector { "name":"mysql-sink-connector", "config": { "tasks.max":"2", "batch.size":"1000", "batch.max.rows":"1000", "poll.interval.ms":"500", "connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", "connection.url":"jdbc:mysql://moddevdb.mysql.database.azure.com:3306/db_grab_dev", "table.name":"db_grab_dev.tbl_clients_merchants", "topics":"test", "connection.user":"grabmod", "connection.password":"#admin", "auto.create":"true", "auto.evolve":"true", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":"true" } } [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177) org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema type: null at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:743) at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363) at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162) ... 13 more [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178) [2021-08-04 11:18:30,235] INFO [Consumer clientId=consumer-18, groupId=connect-mysql-sink-connector] Sending LeaveGroup request to coordinator wn2-grabde.fkgw2p1emuqu5d21xcbqrhqqbf.rx.internal.cloudapp.net:9092 (id: 2147482646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:782) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jlprat commented on pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes
jlprat commented on pull request #10784: URL: https://github.com/apache/kafka/pull/10784#issuecomment-892521275 Pinging @vvcephei Would you be able to merger this before more commits get in the way and we need another rebase? Thanks a ton! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12468) Initial offsets are copied from source to target cluster
[ https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17392914#comment-17392914 ] Alexis Josephides commented on KAFKA-12468: --- Thanks for the suggestions and apologies for the delay in updating how we handled this issue in the end. Should say from the outset that we did not completely remove this issue but we minimised the occurrences, fixed some and in the remainder - lived with it. The first step was minimisation. We achieved this via the phasing of turning on our connectors. The first connector we applied was the `Source` connector. For our setup we had a number of source connectors - some set to replicate from `latest` and others from `earliest`. We let this connector run and replicate until we hit a steady state and all replication was confirmed to be at the head of their relevant topic. This soak could be a few days depending on your data volumes, throughputs (client limits) etc. Once the soak has completed we then turned on the Checkpoint connector. If there are negative offsets after this first step we then took steps to manage them. There are 2 categories here. Partitions that have data on them and partitions that have no data on them. In the first instance (data on partitions) the first thing we try is to `delete` the affected consumer group. This is absolutely fine to do as a) no consumers on the target cluster yet, b) the group is replicated again by MM2. In 90% of instances the negative offset was corrected. In the second instance (no data on partitions) the first thing we examined is whether we could publish data (on source cluster) onto the topic to put data onto the partition. This was then followed by a refresh (delete) of the affected consumer group. This was possible only if the downstream consumer handled either dummy garbage messages ok or was fine with a small number of duplicate messages. What if following the above a negative offset remained? In the instance where there was zero data on a partition and no new data could be published to it we let the consumer migrate onto the target cluster without much worry. The Kafka consumer behaviour at this point would look at a negative offset and throw a warning that it was out of range. It would then reset it's offset on the cluster to its default setting - either consumer from `latest` or `earliest`. Since there is 0 data on that partition this is one and the same thing. For instances (rare but did occur) where there remained a negative offset and data on the partition we still migrated and relied on the consumer behaviour to reset its offset to either `earliest` or `latest`. Depending on the consumer and it's use case we picked whichever best suited the scenario. Hope this is helpful in some way to others that might be experiencing these issues. > Initial offsets are copied from source to target cluster > > > Key: KAFKA-12468 > URL: https://issues.apache.org/jira/browse/KAFKA-12468 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.7.0 >Reporter: Bart De Neuter >Priority: Major > > We have an active-passive setup where the 3 connectors from mirror maker 2 > (heartbeat, checkpoint and source) are running on a dedicated Kafka connect > cluster on the target cluster. > Offset syncing is enabled as specified by KIP-545. But when activated, it > seems the offsets from the source cluster are initially copied to the target > cluster without translation. This causes a negative lag for all synced > consumer groups. Only when we reset the offsets for each topic/partition on > the target cluster and produce a record on the topic/partition in the source, > the sync starts working correctly. > I would expect that the consumer groups are synced but that the current > offsets of the source cluster are not copied to the target cluster. > This is the configuration we are currently using: > Heartbeat connector > > {code:xml} > { > "name": "mm2-mirror-heartbeat", > "config": { > "name": "mm2-mirror-heartbeat", > "connector.class": > "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", > "source.cluster.alias": "eventador", > "target.cluster.alias": "msk", > "source.cluster.bootstrap.servers": "", > "target.cluster.bootstrap.servers": "", > "topics": ".*", > "groups": ".*", > "tasks.max": "1", > "replication.policy.class": "CustomReplicationPolicy", > "sync.group.offsets.enabled": "true", > "sync.group.offsets.interval.seconds": "5", > "emit.checkpoints.enabled": "true", > "emit.checkpoints.interval.seconds": "30", > "emit.heartbeats.interval.seconds": "30", > "key.converter": " > org.apache.kafka.connect.converters.ByteArra