[jira] [Commented] (KAFKA-13163) Issue with MySql worker sink connector

2021-08-04 Thread Muddam Pullaiah Yadav (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13163?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393600#comment-17393600
 ] 

Muddam Pullaiah Yadav commented on KAFKA-13163:
---

Can anyone help me on this issue??.

> Issue with MySql worker sink connector
> --
>
> Key: KAFKA-13163
> URL: https://issues.apache.org/jira/browse/KAFKA-13163
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Affects Versions: 2.1.1
> Environment: PreProd
>Reporter: Muddam Pullaiah Yadav
>Priority: Major
>
> Please help with the following issue. Really appreciate it! 
>  
> We are using Azure HDInsight Kafka cluster 
> My sink Properties:
>  
> cat mysql-sink-connector
>  {
>  "name":"mysql-sink-connector",
>  "config":
> { "tasks.max":"2", "batch.size":"1000", "batch.max.rows":"1000", 
> "poll.interval.ms":"500", 
> "connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", 
> "connection.url":"jdbc:mysql://moddevdb.mysql.database.azure.com:3306/db_test_dev",
>  "table.name":"db_test_dev.tbl_clients_merchants", "topics":"test", 
> "connection.user":"grabmod", "connection.password":"#admin", 
> "auto.create":"true", "auto.evolve":"true", 
> "value.converter":"org.apache.kafka.connect.json.JsonConverter", 
> "value.converter.schemas.enable":"false", 
> "key.converter":"org.apache.kafka.connect.json.JsonConverter", 
> "key.converter.schemas.enable":"true" }
> }
>  
> [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} 
> Task threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:177)
>  org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in 
> error handler
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
>  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema 
> type: null
>  at 
> org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:743)
>  at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>  ... 13 more
>  [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} 
> Task is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:178)
>  [2021-08-04 11:18:30,235] INFO [Consumer clientId=consumer-18, 
> groupId=connect-mysql-sink-connector] Sending LeaveGroup request to 
> coordinator 
> wn2-grabde.fkgw2p1emuqu5d21xcbqrhqqbf.rx.internal.cloudapp.net:9092 (id: 
> 2147482646 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:782)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13163) Issue with MySql worker sink connector

2021-08-04 Thread Muddam Pullaiah Yadav (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Muddam Pullaiah Yadav updated KAFKA-13163:
--
Reviewer: Ryan Dielhenn  (was: saurab)

> Issue with MySql worker sink connector
> --
>
> Key: KAFKA-13163
> URL: https://issues.apache.org/jira/browse/KAFKA-13163
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Affects Versions: 2.1.1
> Environment: PreProd
>Reporter: Muddam Pullaiah Yadav
>Priority: Major
>
> Please help with the following issue. Really appreciate it! 
>  
> We are using Azure HDInsight Kafka cluster 
> My sink Properties:
>  
> cat mysql-sink-connector
>  {
>  "name":"mysql-sink-connector",
>  "config":
> { "tasks.max":"2", "batch.size":"1000", "batch.max.rows":"1000", 
> "poll.interval.ms":"500", 
> "connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", 
> "connection.url":"jdbc:mysql://moddevdb.mysql.database.azure.com:3306/db_test_dev",
>  "table.name":"db_test_dev.tbl_clients_merchants", "topics":"test", 
> "connection.user":"grabmod", "connection.password":"#admin", 
> "auto.create":"true", "auto.evolve":"true", 
> "value.converter":"org.apache.kafka.connect.json.JsonConverter", 
> "value.converter.schemas.enable":"false", 
> "key.converter":"org.apache.kafka.connect.json.JsonConverter", 
> "key.converter.schemas.enable":"true" }
> }
>  
> [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} 
> Task threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:177)
>  org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in 
> error handler
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
>  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema 
> type: null
>  at 
> org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:743)
>  at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>  ... 13 more
>  [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} 
> Task is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:178)
>  [2021-08-04 11:18:30,235] INFO [Consumer clientId=consumer-18, 
> groupId=connect-mysql-sink-connector] Sending LeaveGroup request to 
> coordinator 
> wn2-grabde.fkgw2p1emuqu5d21xcbqrhqqbf.rx.internal.cloudapp.net:9092 (id: 
> 2147482646 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:782)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13163) Issue with MySql worker sink connector

2021-08-04 Thread Muddam Pullaiah Yadav (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Muddam Pullaiah Yadav updated KAFKA-13163:
--
   Reviewer: saurab  (was: Ryan Dielhenn)
Description: 
Please help with the following issue. Really appreciate it! 

 

We are using Azure HDInsight Kafka cluster 

My sink Properties:

 

cat mysql-sink-connector
 {
 "name":"mysql-sink-connector",
 "config":

{ "tasks.max":"2", "batch.size":"1000", "batch.max.rows":"1000", 
"poll.interval.ms":"500", 
"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", 
"connection.url":"jdbc:mysql://moddevdb.mysql.database.azure.com:3306/db_test_dev",
 "table.name":"db_test_dev.tbl_clients_merchants", "topics":"test", 
"connection.user":"grabmod", "connection.password":"#admin", 
"auto.create":"true", "auto.evolve":"true", 
"value.converter":"org.apache.kafka.connect.json.JsonConverter", 
"value.converter.schemas.enable":"false", 
"key.converter":"org.apache.kafka.connect.json.JsonConverter", 
"key.converter.schemas.enable":"true" }

}

 

[2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask:177)
 org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
handler
 at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
 at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema type: 
null
 at 
org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:743)
 at 
org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
 at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
 at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
 ... 13 more
 [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} 
Task is being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask:178)
 [2021-08-04 11:18:30,235] INFO [Consumer clientId=consumer-18, 
groupId=connect-mysql-sink-connector] Sending LeaveGroup request to coordinator 
wn2-grabde.fkgw2p1emuqu5d21xcbqrhqqbf.rx.internal.cloudapp.net:9092 (id: 
2147482646 rack: null) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:782)

  was:
Please help with the following issue. Really appreciate it! 

 

We are using Azure HDInsight Kafka cluster 

My sink Properties:

 

cat mysql-sink-connector
 {
 "name":"mysql-sink-connector",
 "config":

{ "tasks.max":"2", "batch.size":"1000", "batch.max.rows":"1000", 
"poll.interval.ms":"500", 
"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", 
"connection.url":"jdbc:mysql://moddevdb.mysql.database.azure.com:3306/db_grab_dev",
 "table.name":"db_grab_dev.tbl_clients_merchants", "topics":"test", 
"connection.user":"grabmod", "connection.password":"#admin", 
"auto.create":"true", "auto.evolve":"true", 
"value.converter":"org.apache.kafka.connect.json.JsonConverter", 
"value.converter.schemas.enable":"false", 
"key.converter":"org.apache.kafka.connect.json.JsonConverter", 
"key.converter.schemas.enable":"true" }

}

 

[2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask:177)
 org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
handler
 at 

[jira] [Updated] (KAFKA-13163) Issue with MySql worker sink connector

2021-08-04 Thread Muddam Pullaiah Yadav (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13163?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Muddam Pullaiah Yadav updated KAFKA-13163:
--
  Reviewer: Ryan Dielhenn
Issue Type: Task  (was: Bug)

> Issue with MySql worker sink connector
> --
>
> Key: KAFKA-13163
> URL: https://issues.apache.org/jira/browse/KAFKA-13163
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Affects Versions: 2.1.1
> Environment: PreProd
>Reporter: Muddam Pullaiah Yadav
>Priority: Major
>
> Please help with the following issue. Really appreciate it! 
>  
> We are using Azure HDInsight Kafka cluster 
> My sink Properties:
>  
> cat mysql-sink-connector
>  {
>  "name":"mysql-sink-connector",
>  "config":
> { "tasks.max":"2", "batch.size":"1000", "batch.max.rows":"1000", 
> "poll.interval.ms":"500", 
> "connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", 
> "connection.url":"jdbc:mysql://moddevdb.mysql.database.azure.com:3306/db_grab_dev",
>  "table.name":"db_grab_dev.tbl_clients_merchants", "topics":"test", 
> "connection.user":"grabmod", "connection.password":"#admin", 
> "auto.create":"true", "auto.evolve":"true", 
> "value.converter":"org.apache.kafka.connect.json.JsonConverter", 
> "value.converter.schemas.enable":"false", 
> "key.converter":"org.apache.kafka.connect.json.JsonConverter", 
> "key.converter.schemas.enable":"true" }
> }
>  
> [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} 
> Task threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask:177)
>  org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in 
> error handler
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
>  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema 
> type: null
>  at 
> org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:743)
>  at 
> org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363)
>  at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>  at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>  ... 13 more
>  [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} 
> Task is being killed and will not recover until manually restarted 
> (org.apache.kafka.connect.runtime.WorkerTask:178)
>  [2021-08-04 11:18:30,235] INFO [Consumer clientId=consumer-18, 
> groupId=connect-mysql-sink-connector] Sending LeaveGroup request to 
> coordinator 
> wn2-grabde.fkgw2p1emuqu5d21xcbqrhqqbf.rx.internal.cloudapp.net:9092 (id: 
> 2147482646 rack: null) 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:782)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #11178: KAFKA-13168: KRaft observers should not have a replica id

2021-08-04 Thread GitBox


jsancio commented on a change in pull request #11178:
URL: https://github.com/apache/kafka/pull/11178#discussion_r683111844



##
File path: core/src/main/scala/kafka/raft/RaftManager.scala
##
@@ -180,6 +181,12 @@ class KafkaRaftManager[T](
 val expirationService = new TimingWheelExpirationService(expirationTimer)
 val quorumStateStore = new FileBasedStateStore(new File(dataDir, 
"quorum-state"))
 
+val nodeId = if (config.processRoles.contains(BrokerRole) && 
!config.processRoles.contains(ControllerRole)) {
+  OptionalInt.empty()
+} else {
+  OptionalInt.of(config.nodeId)
+}
+

Review comment:
   ```suggestion
   val nodeId = if (config.processRoles.contains(ControllerRole)) {
   OptionalInt.of(config.nodeId)
   } else {
   OptionalInt.empty()
   }
   
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

2021-08-04 Thread GitBox


jsancio commented on a change in pull request #11179:
URL: https://github.com/apache/kafka/pull/11179#discussion_r683110083



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1949,6 +1951,15 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   )
 }
 
+val voterIds: Set[Integer] = if (usesSelfManagedQuorum) 
RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else 
Set.empty
+if (processRoles.contains(ControllerRole)) {
+  // Ensure that controllers use their node.id as a voter in 
controller.quorum.voters 
+  require(voterIds.contains(nodeId), s"The controller must contain a voter 
for it's ${KafkaConfig.NodeIdProp}=$nodeId in 
${RaftConfig.QUORUM_VOTERS_CONFIG}.")
+} else {
+  // Ensure that the broker's node.id is not an id in 
controller.quorum.voters
+  require(!voterIds.contains(nodeId), s"Since 
${KafkaConfig.ProcessRolesProp}=broker, the the broker's 
${KafkaConfig.NodeIdProp}=nodeId should not be in 
${RaftConfig.QUORUM_VOTERS_CONFIG}")

Review comment:
   How about "If ${process role config...} does not contain the 
'controller' role, the node id $nodeId must not be included in the set of 
voters ${QUORUM_VOTERS_CONFIG}=$votersIds"?

##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1949,6 +1951,15 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: 
Boolean, dynamicConfigO
   )
 }
 
+val voterIds: Set[Integer] = if (usesSelfManagedQuorum) 
RaftConfig.parseVoterConnections(quorumVoters).asScala.keySet.toSet else 
Set.empty
+if (processRoles.contains(ControllerRole)) {
+  // Ensure that controllers use their node.id as a voter in 
controller.quorum.voters 
+  require(voterIds.contains(nodeId), s"The controller must contain a voter 
for it's ${KafkaConfig.NodeIdProp}=$nodeId in 
${RaftConfig.QUORUM_VOTERS_CONFIG}.")

Review comment:
   How about "If ${process role config...} contains the 'controller' 
role, the node id $nodeId must be included in the set of voters 
${QUORUM_VOTERS_CONFIG}=$votersIds"? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kamalcph commented on pull request #10259: MINOR: Provide valid examples in README page.

2021-08-04 Thread GitBox


kamalcph commented on pull request #10259:
URL: https://github.com/apache/kafka/pull/10259#issuecomment-893137436


   @ijuma Can you please review and merge this patch? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #11153: MINOR: Port fix to other StoreQueryIntegrationTests

2021-08-04 Thread GitBox


wcarlson5 commented on pull request #11153:
URL: https://github.com/apache/kafka/pull/11153#issuecomment-893137087


   @vvcephei looks like it is working now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683102378



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability

Review comment:
   Well I removed the whole `assignedNamedTopologies` thing from this PR, 
but yes, that was one of the original intentions of adding it to the 
AssignmentInfo. I'm leaning towards keeping things simple for now and avoid 
adding on to the AssignmentInfo where possible. We can revisit this if the 
extra rebalances become a problem.
   
   It could be sufficient to just use the set of actually-assigned topologies 
as a proxy. It's not perfect since it could be that the leader was aware of a 
topology and simply happened to not assign any of its tasks to this client, but 
I'd bet this gets rid of most of the unnecessary rebalances.
   
   That said, I'm saving this optimization for a followup Pt. 4 PR. I'll 
revisit the best way to handle the `builders` map in that as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683096288



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##
@@ -67,13 +72,31 @@
 public class NamedTopologyIntegrationTest {
 public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
 
+// TODO KAFKA-12648:

Review comment:
   I'm still filling out the integration test suite, especially the 
multi-node testing, but I'll make sure this scenario has coverage. This will 
probably have to be in the followup Pt. 4 which expands 
`add/removeNamedTopology` to return a `Future`, since being able to block on 
this helps a lot with the testing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683095231



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -867,6 +873,36 @@ private void initializeAndRestorePhase() {
 log.debug("Idempotent restore call done. Thread state has not 
changed.");
 }
 
+private void handleTopologyUpdatesPhase() {
+// Check if the topology has been updated since we last checked, ie 
via #addNamedTopology or #removeNamedTopology
+// or if this is the very first topology in which case we may need to 
wait for it to be non-empty
+if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || 
lastSeenTopologyVersion == 0) {
+try {

Review comment:
   I refactored this part of the code quite a bit, let me know if you have 
any remaining concerns with the new logic




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr opened a new pull request #11179: KAFKA-13165: Validate node id, process role and quorum voters

2021-08-04 Thread GitBox


dielhennr opened a new pull request #11179:
URL: https://github.com/apache/kafka/pull/11179


   Under certain configuration is possible for the Kafka Server to boot up as a 
broker only but be the cluster metadata quorum leader. We should validate the 
configuration to avoid this case.
   
   https://issues.apache.org/jira/browse/KAFKA-13165
   
   Tested manually by starting up a broker and a controller both with 
valid/invalid configurations 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


ableegoldman commented on pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#issuecomment-893124886


   @guozhangwang I think I've addressed all your feedback and significantly 
cleaned up the streamthread event loop + topology locking, let me know if 
there's anything else


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig

2021-08-04 Thread GitBox


ccding commented on a change in pull request #0:
URL: https://github.com/apache/kafka/pull/0#discussion_r683068699



##
File path: core/src/main/scala/kafka/log/LogConfig.scala
##
@@ -107,46 +107,52 @@ case class LogConfig(props: java.util.Map[_, _], 
overriddenConfigs: Set[String]
   val LeaderReplicationThrottledReplicas = 
getList(LogConfig.LeaderReplicationThrottledReplicasProp)
   val FollowerReplicationThrottledReplicas = 
getList(LogConfig.FollowerReplicationThrottledReplicasProp)
   val messageDownConversionEnable = 
getBoolean(LogConfig.MessageDownConversionEnableProp)
-  val remoteStorageEnable = getBoolean(LogConfig.RemoteLogStorageEnableProp)
 
-  val localRetentionMs: Long = {
-val localLogRetentionMs = getLong(LogConfig.LocalLogRetentionMsProp)
+  class TieredLogConfig {

Review comment:
   Done. Thanks for the comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr opened a new pull request #11178: KAFKA-13168: KRaft observers should not have a replica id

2021-08-04 Thread GitBox


dielhennr opened a new pull request #11178:
URL: https://github.com/apache/kafka/pull/11178


   Fix the `KafkaRaftClient` to use Optional.empty as its localId so that the 
sentinel `node.id`, `node.id=-1` is sent as the replicaId in the `FetchRequest`.
   
   https://issues.apache.org/jira/browse/KAFKA-13168
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683065432



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -53,44 +56,162 @@
 private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
 private final StreamsConfig config;
-private final SortedMap builders; // Keep 
sorted by topology name for readability
+private final TopologyVersion version;
+
+private final ConcurrentNavigableMap 
builders; // Keep sorted by topology name for readability
 
 private ProcessorTopology globalTopology;
-private Map globalStateStores = new HashMap<>();
-final Set allInputTopics = new HashSet<>();
+private final Map globalStateStores = new HashMap<>();
+private final Set allInputTopics = new HashSet<>();
+
+public static class TopologyVersion {
+public AtomicLong topologyVersion = new AtomicLong(0L); // the local 
topology version
+public Set assignedNamedTopologies = new HashSet<>(); // the 
named topologies whose tasks are actively assigned
+public ReentrantLock topologyLock = new ReentrantLock();
+public Condition topologyCV = topologyLock.newCondition();
+}
 
-public TopologyMetadata(final InternalTopologyBuilder builder, final 
StreamsConfig config) {
+public TopologyMetadata(final InternalTopologyBuilder builder,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
-builders = new TreeMap<>();
+builders = new ConcurrentSkipListMap<>();
 if (builder.hasNamedTopology()) {
 builders.put(builder.topologyName(), builder);
 } else {
 builders.put(UNNAMED_TOPOLOGY, builder);
 }
 }
 
-public TopologyMetadata(final SortedMap 
builders, final StreamsConfig config) {
+public TopologyMetadata(final ConcurrentNavigableMap builders,
+final StreamsConfig config) {
+version = new TopologyVersion();
 this.config = config;
+
 this.builders = builders;
 if (builders.isEmpty()) {
-log.debug("Building KafkaStreams app with no empty topology");
+log.debug("Starting up empty KafkaStreams app with no topology");
 }
 }
 
-public int getNumStreamThreads(final StreamsConfig config) {
-final int configuredNumStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+public void updateCurrentAssignmentTopology(final Set 
assignedNamedTopologies) {
+version.assignedNamedTopologies = assignedNamedTopologies;
+}
 
-// If the application uses named topologies, it's possible to start up 
with no topologies at all and only add them later
-if (builders.isEmpty()) {
-if (configuredNumStreamThreads != 0) {
-log.info("Overriding number of StreamThreads to zero for empty 
topology");
+/**
+ * @return the set of named topologies that the assignor distributed tasks 
for during the last rebalance
+ */
+public Set assignmentNamedTopologies() {
+return version.assignedNamedTopologies;
+}
+
+public long topologyVersion() {
+return version.topologyVersion.get();
+}
+
+public void lock() {
+version.topologyLock.lock();
+}
+
+public void unlock() {
+version.topologyLock.unlock();
+}
+
+public InternalTopologyBuilder getBuilderForTopologyName(final String 
name) {
+return builders.get(name);
+}
+
+/**
+ * @throws IllegalStateException if the thread is not already holding the 
lock via TopologyMetadata#lock
+ */
+public void maybeWaitForNonEmptyTopology() {
+if (!version.topologyLock.isHeldByCurrentThread()) {

Review comment:
   Agreed, let me clean this up




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683057288



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition 
topicPartition, final Lon
 }
 }
 
+/**
+ * Checks for added or removed NamedTopologies that correspond to any 
assigned tasks, and creates/freezes them if so
+ */
+void handleTopologyUpdates() {
+tasks.maybeCreateTasksFromNewTopologies();
+for (final Task task : activeTaskIterable()) {
+if 
(topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) {
+task.freezeProcessing();

Review comment:
   This is another stupid bug...it should be freezing them if they _don't _ 
appear in the current topology. Tasks from removed named topologies are frozen 
to prevent them from processing any more records, so yes, it is meant to be 
permanent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


ableegoldman commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r683055067



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##
@@ -84,6 +86,15 @@ void setMainConsumer(final Consumer 
mainConsumer) {
 this.mainConsumer = mainConsumer;
 }
 
+void handleNewAssignmentAndCreateTasks(final Map> activeTasksToCreate,
+   final Map> standbyTasksToCreate,
+   final Set 
assignedActiveTasks,
+   final Set 
assignedStandbyTasks) {
+activeTaskCreator.removeRevokedUnknownTasks(diff(HashSet::new, 
assignedActiveTasks, activeTasksToCreate.keySet()));

Review comment:
   Whoops, that's what happens when you push changes late at night  I'll 
fix it up...hopefully will make more sense then




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

2021-08-04 Thread GitBox


jolshan commented on pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#issuecomment-893071380


   Ok, tested locally (with the correct jar this time) and it looked good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan removed a comment on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

2021-08-04 Thread GitBox


jolshan removed a comment on pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#issuecomment-893066311


   Just tested locally and something is still wrong here. Need to followup.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan edited a comment on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

2021-08-04 Thread GitBox


jolshan edited a comment on pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#issuecomment-893066311


   Just tested locally and something is still wrong here. Need to followup.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

2021-08-04 Thread GitBox


jolshan commented on pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#issuecomment-893066311


   Just tested and something is still wrong here. Need to followup.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13168) KRaft observers should not have a replica id

2021-08-04 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn reassigned KAFKA-13168:
-

Assignee: Ryan Dielhenn

> KRaft observers should not have a replica id
> 
>
> Key: KAFKA-13168
> URL: https://issues.apache.org/jira/browse/KAFKA-13168
> Project: Kafka
>  Issue Type: Bug
>  Components: kraft
>Reporter: Jose Armando Garcia Sancio
>Assignee: Ryan Dielhenn
>Priority: Blocker
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> To avoid miss configuration of a broker affecting the quorum of the cluster 
> metadata partition when a Kafka node is configure as broker only the replica 
> id for the KRaft client should be set to {{Optional::empty()}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13165) Validate node id, process role and quorum voters

2021-08-04 Thread Ryan Dielhenn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryan Dielhenn reassigned KAFKA-13165:
-

Assignee: Ryan Dielhenn

> Validate node id, process role and quorum voters
> 
>
> Key: KAFKA-13165
> URL: https://issues.apache.org/jira/browse/KAFKA-13165
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: Jose Armando Garcia Sancio
>Assignee: Ryan Dielhenn
>Priority: Blocker
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> Under certain configuration is possible for the Kafka Server to boot up as a 
> broker only but be the cluster metadata quorum leader. We should validate the 
> configuration to avoid this case.
>  # If the {{process.roles}} contains {{controller}} then the {{node.id}} 
> needs to be in the {{controller.quorum.voters}}
>  # If the {{process.roles}} doesn't contain {{controller}} then the 
> {{node.id}} cannot be in the {{controller.quorum.voters}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12724) Add 2.8.0 to system tests and streams upgrade tests

2021-08-04 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393508#comment-17393508
 ] 

Konstantine Karantasis commented on KAFKA-12724:


The PR got merged, so I'm resolving this issue. Thanks [~vvcephei] and 
[~ckamal] 

If any issues persist in system tests we can always reopen this ticket. 

> Add 2.8.0 to system tests and streams upgrade tests
> ---
>
> Key: KAFKA-12724
> URL: https://issues.apache.org/jira/browse/KAFKA-12724
> Project: Kafka
>  Issue Type: Task
>  Components: streams, system tests
>Reporter: Kamal Chandraprakash
>Assignee: Kamal Chandraprakash
>Priority: Blocker
> Fix For: 3.0.0
>
>
> Kafka v2.8.0 is released. We should add this version to the system tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13160) Fix the code that calls the broker’s config handler to pass the expected default resource name when using KRaft.

2021-08-04 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-13160.
--
Resolution: Fixed

> Fix the code that calls the broker’s config handler to pass the expected 
> default resource name when using KRaft.
> 
>
> Key: KAFKA-13160
> URL: https://issues.apache.org/jira/browse/KAFKA-13160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Ryan Dielhenn
>Assignee: Ryan Dielhenn
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In a ZK cluster, dynamic default broker configs are stored in the zNode 
> /brokers/. Without this fix, when dynamic configs from snapshots are 
> processed by the KRaft brokers, the BrokerConfigHandler checks if the 
> resource name is "" to do a default update and converts the resource 
> name to an integer otherwise to do a per-broker config update.
> In KRaft, dynamic default broker configs are serialized in metadata with 
> empty string instead of "". This was causing the BrokerConfigHandler 
> to throw a NumberFormatException for dynamic default broker configs since the 
> resource name for them is not "" or a single integer. The code that 
> calls the handler method for config changes should be fixed to pass 
> "" instead of empty string to the handler method if using KRaft.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] joel-hamill closed pull request #4536: [WIP] Migrate HTML to RST

2021-08-04 Thread GitBox


joel-hamill closed pull request #4536:
URL: https://github.com/apache/kafka/pull/4536


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11168: KAFKA-13160: Fix BrokerMetadataPublisher to pass the correct resource name to the config handler when processing config updates

2021-08-04 Thread GitBox


cmccabe merged pull request #11168:
URL: https://github.com/apache/kafka/pull/11168


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

2021-08-04 Thread GitBox


jolshan commented on a change in pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#discussion_r683019982



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -553,6 +553,14 @@ class Log(@volatile var logStartOffset: Long,
 
   /** Only used for ZK clusters when we update and start using topic IDs on 
existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
+// defensively check that any newly assign topic ID matches any that is 
already set
+_topicId.foreach { current =>
+  if (!current.equals(topicId))

Review comment:
   We can. I was just looking at this case again. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

2021-08-04 Thread GitBox


hachikuji commented on a change in pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#discussion_r683019148



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -553,6 +553,14 @@ class Log(@volatile var logStartOffset: Long,
 
   /** Only used for ZK clusters when we update and start using topic IDs on 
existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
+// defensively check that any newly assign topic ID matches any that is 
already set
+_topicId.foreach { current =>
+  if (!current.equals(topicId))

Review comment:
   Can we shortcut return if the current topicId is already defined and 
matches the provided topicId?

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -553,6 +553,14 @@ class Log(@volatile var logStartOffset: Long,
 
   /** Only used for ZK clusters when we update and start using topic IDs on 
existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
+// defensively check that any newly assign topic ID matches any that is 
already set
+_topicId.foreach { current =>
+  if (!current.equals(topicId))
+  // we should never get here as the topic IDs should have been checked in 
becomeLeaderOrFollower

Review comment:
   nit: fix alignment (just use braces  . I won't tell anyone)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #11177: KAFKA-13167; KRaft broker should send heartbeat immediately after starting controlled shutdown

2021-08-04 Thread GitBox


hachikuji merged pull request #11177:
URL: https://github.com/apache/kafka/pull/11177


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13168) KRaft observers should not have a replica id

2021-08-04 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13168:
--

 Summary: KRaft observers should not have a replica id
 Key: KAFKA-13168
 URL: https://issues.apache.org/jira/browse/KAFKA-13168
 Project: Kafka
  Issue Type: Bug
  Components: kraft
Reporter: Jose Armando Garcia Sancio
 Fix For: 3.0.0


To avoid miss configuration of a broker affecting the quorum of the cluster 
metadata partition when a Kafka node is configure as broker only the replica id 
for the KRaft client should be set to {{Optional::empty()}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13165) Validate node id, process role and quorum voters

2021-08-04 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-13165:
---
Labels: kip-500  (was: )

> Validate node id, process role and quorum voters
> 
>
> Key: KAFKA-13165
> URL: https://issues.apache.org/jira/browse/KAFKA-13165
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: Jose Armando Garcia Sancio
>Priority: Blocker
>  Labels: kip-500
> Fix For: 3.0.0
>
>
> Under certain configuration is possible for the Kafka Server to boot up as a 
> broker only but be the cluster metadata quorum leader. We should validate the 
> configuration to avoid this case.
>  # If the {{process.roles}} contains {{controller}} then the {{node.id}} 
> needs to be in the {{controller.quorum.voters}}
>  # If the {{process.roles}} doesn't contain {{controller}} then the 
> {{node.id}} cannot be in the {{controller.quorum.voters}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13165) Validate node id, process role and quorum voters

2021-08-04 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-13165:
---
Fix Version/s: 3.0.0

> Validate node id, process role and quorum voters
> 
>
> Key: KAFKA-13165
> URL: https://issues.apache.org/jira/browse/KAFKA-13165
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: Jose Armando Garcia Sancio
>Priority: Blocker
> Fix For: 3.0.0
>
>
> Under certain configuration is possible for the Kafka Server to boot up as a 
> broker only but be the cluster metadata quorum leader. We should validate the 
> configuration to avoid this case.
>  # If the {{process.roles}} contains {{controller}} then the {{node.id}} 
> needs to be in the {{controller.quorum.voters}}
>  # If the {{process.roles}} doesn't contain {{controller}} then the 
> {{node.id}} cannot be in the {{controller.quorum.voters}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13165) Validate node id, process role and quorum voters

2021-08-04 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-13165:
---
Priority: Blocker  (was: Major)

> Validate node id, process role and quorum voters
> 
>
> Key: KAFKA-13165
> URL: https://issues.apache.org/jira/browse/KAFKA-13165
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: Jose Armando Garcia Sancio
>Priority: Blocker
>
> Under certain configuration is possible for the Kafka Server to boot up as a 
> broker only but be the cluster metadata quorum leader. We should validate the 
> configuration to avoid this case.
>  # If the {{process.roles}} contains {{controller}} then the {{node.id}} 
> needs to be in the {{controller.quorum.voters}}
>  # If the {{process.roles}} doesn't contain {{controller}} then the 
> {{node.id}} cannot be in the {{controller.quorum.voters}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13165) Validate node id, process role and quorum voters

2021-08-04 Thread Jose Armando Garcia Sancio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393486#comment-17393486
 ] 

Jose Armando Garcia Sancio commented on KAFKA-13165:


We discussed this offline. We decided to have controller and brokers share the 
same id space. In other works it is expected that the id of a controller only 
node doesn't conflict with the id of a broker only node.

I am going to make the validation describe in this Jira a blocker of 3.0.

> Validate node id, process role and quorum voters
> 
>
> Key: KAFKA-13165
> URL: https://issues.apache.org/jira/browse/KAFKA-13165
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: Jose Armando Garcia Sancio
>Priority: Major
>
> Under certain configuration is possible for the Kafka Server to boot up as a 
> broker only but be the cluster metadata quorum leader. We should validate the 
> configuration to avoid this case.
>  # If the {{process.roles}} contains {{controller}} then the {{node.id}} 
> needs to be in the {{controller.quorum.voters}}
>  # If the {{process.roles}} doesn't contain {{controller}} then the 
> {{node.id}} cannot be in the {{controller.quorum.voters}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei merged pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-04 Thread GitBox


vvcephei merged pull request #10602:
URL: https://github.com/apache/kafka/pull/10602


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-04 Thread GitBox


vvcephei commented on pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#issuecomment-893016769


   There were still 21 test failures, but they do not look related to this 
change. There are also fixes ongoing in parallel to other tests, so I'm going 
to go ahead and merge this one. If it turns out to be the cause of new 
problems, we should just revert it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #11177: KAFKA-13167; KRaft broker should send heartbeat immediately after starting controlled shutdown

2021-08-04 Thread GitBox


hachikuji opened a new pull request #11177:
URL: https://github.com/apache/kafka/pull/11177


   Controlled shutdown in KRaft is signaled through a heartbeat request with 
the `shouldShutDown` flag set to true. When we begin controlled shutdown, we 
should immediately schedule the next heartbeat instead of waiting for the next 
periodic heartbeat. This allows the broker to shutdown more quickly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11151: MINOR: Should commit a task if the consumer position advanced as well

2021-08-04 Thread GitBox


guozhangwang commented on a change in pull request #11151:
URL: https://github.com/apache/kafka/pull/11151#discussion_r682935981



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -1158,7 +1158,38 @@ public String toString(final String indent) {
 
 @Override
 public boolean commitNeeded() {
-return commitNeeded;
+// we need to do an extra check if the flag was false, that
+// if the consumer position has been updated; this is because
+// there may be non data records such as control markers bypassed
+if (commitNeeded) {
+return true;
+} else {
+for (final Map.Entry entry : 
consumedOffsets.entrySet()) {
+final TopicPartition partition = entry.getKey();
+try {
+final long offset = mainConsumer.position(partition);
+
+// note the position in consumer is the "next" record to 
fetch,
+// so it should be larger than the consumed offset by 1; 
if it is
+// more than 1 it means there are skipped offsets

Review comment:
   ack




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang edited a comment on pull request #11155: MINOR: Fix flaky shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound

2021-08-04 Thread GitBox


guozhangwang edited a comment on pull request #11155:
URL: https://github.com/apache/kafka/pull/11155#issuecomment-890272874


   ping @cadonna @ableegoldman for reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12935) Flaky Test RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore

2021-08-04 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson reassigned KAFKA-12935:
--

Assignee: Walker Carlson

> Flaky Test 
> RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore
> 
>
> Key: KAFKA-12935
> URL: https://issues.apache.org/jira/browse/KAFKA-12935
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Walker Carlson
>Priority: Critical
>  Labels: flaky-test
>
> {quote}java.lang.AssertionError: Expected: <0L> but: was <5005L> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore(RestoreIntegrationTest.java:374)
> {quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 opened a new pull request #11176: MINOR: use relative counts for shouldRecycleStateFromStandbyTaskPromotedToActiveTaskAndNotRestore

2021-08-04 Thread GitBox


wcarlson5 opened a new pull request #11176:
URL: https://github.com/apache/kafka/pull/11176


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13167) KRaft broker should heartbeat immediately during controlled shutdown

2021-08-04 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13167:
---

 Summary: KRaft broker should heartbeat immediately during 
controlled shutdown
 Key: KAFKA-13167
 URL: https://issues.apache.org/jira/browse/KAFKA-13167
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Controlled shutdown in KRaft is signaled through a heartbeat request with the 
`shouldShutDown` flag set to true. When we begin controlled shutdown, we should 
immediately schedule the next heartbeat instead of waiting for the next 
periodic heartbeat so that we can shutdown more quickly. Otherwise controlled 
shutdown can be delayed by several seconds.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] joel-hamill commented on pull request #11163: MINOR: doc change for minisr to clarify replicas in Kafka Config

2021-08-04 Thread GitBox


joel-hamill commented on pull request #11163:
URL: https://github.com/apache/kafka/pull/11163#issuecomment-892895687


   LGTM, had one comment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] joel-hamill commented on a change in pull request #11163: MINOR: doc change for minisr to clarify replicas in Kafka Config

2021-08-04 Thread GitBox


joel-hamill commented on a change in pull request #11163:
URL: https://github.com/apache/kafka/pull/11163#discussion_r682878756



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -826,10 +826,11 @@ object KafkaConfig {
 "a write for the write to be considered successful. If this minimum cannot 
be met, " +
 "then the producer will raise an exception (either NotEnoughReplicas or " +
 "NotEnoughReplicasAfterAppend).When used together, min.insync.replicas 
and acks " +
-"allow you to enforce greater durability guarantees. A typical scenario 
would be to " +
+"allow you to enforce greater durability guarantees. The leader and its 
followers are " +
+"all considered replicas so a typical scenario would be to " +
 "create a topic with a replication factor of 3, set min.insync.replicas to 
2, and " +
 "produce with acks of \"all\". This will ensure that the producer raises 
an exception " +
-"if a majority of replicas do not receive a write."
+"if a majority of replicas (2) do not receive a write."

Review comment:
   ```suggestion
   "if a majority of replicas, in this case 2, do not receive a write."
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah merged pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft

2021-08-04 Thread GitBox


mumrah merged pull request #11166:
URL: https://github.com/apache/kafka/pull/11166


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah edited a comment on pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft

2021-08-04 Thread GitBox


mumrah edited a comment on pull request #11166:
URL: https://github.com/apache/kafka/pull/11166#issuecomment-892891735


   Spoke with @vvcephei offline about the streams smoke test and it is a known 
flaky. We are good-to-go with enabling KRaft for this test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft

2021-08-04 Thread GitBox


mumrah commented on pull request #11166:
URL: https://github.com/apache/kafka/pull/11166#issuecomment-892891735


   Spoke with @vvcephei about the streams smoke test and it is a known flaky. 
We are good-to-go with enabling KRaft for this test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682871868



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -172,7 +172,7 @@ public void removeNamedTopology(final String 
topologyToRemove) {
  */
 public void cleanUpNamedTopology(final String name) {
 if (getTopologyByName(name).isPresent()) {
-throw new IllegalStateException("Can't clean up local state for an 
active NamedTopology");
+throw new IllegalStateException("Can't clean up local state for an 
active NamedTopology: ");

Review comment:
   The topology was not included? :)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -1193,6 +1189,18 @@ public void updateTaskEndMetadata(final TopicPartition 
topicPartition, final Lon
 }
 }
 
+/**
+ * Checks for added or removed NamedTopologies that correspond to any 
assigned tasks, and creates/freezes them if so
+ */
+void handleTopologyUpdates() {
+tasks.maybeCreateTasksFromNewTopologies();
+for (final Task task : activeTaskIterable()) {
+if 
(topologyMetadata.namedTopologiesView().contains(task.id().namedTopology())) {
+task.freezeProcessing();

Review comment:
   Not sure I understand this logic.. could you explain a bit? Also there's 
no unfrozen function so once a task is frozen it would stick with that state 
forever?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java
##
@@ -84,6 +86,15 @@ void setMainConsumer(final Consumer 
mainConsumer) {
 this.mainConsumer = mainConsumer;
 }
 
+void handleNewAssignmentAndCreateTasks(final Map> activeTasksToCreate,
+   final Map> standbyTasksToCreate,
+   final Set 
assignedActiveTasks,
+   final Set 
assignedStandbyTasks) {
+activeTaskCreator.removeRevokedUnknownTasks(diff(HashSet::new, 
assignedActiveTasks, activeTasksToCreate.keySet()));

Review comment:
   The diff between `assignedActiveTasks` and `activeTasksToCreate` are 
those tasks that are already owned and hence do not need to be assigned, am I 
reading that right (ditto for standbys)? Why these should be considered as 
remove revoked?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10788: KAFKA-12648: Pt. 3 - addNamedTopology API

2021-08-04 Thread GitBox


guozhangwang commented on a change in pull request #10788:
URL: https://github.com/apache/kafka/pull/10788#discussion_r682861544



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##
@@ -64,6 +65,9 @@
 private final Map taskProducers;
 private final StreamThread.ProcessingMode processingMode;
 
+// tasks may be assigned for a NamedTopology that is not yet known by this 
host, and saved for later creation
+private final Map>  unknownTasksToBeCreated = 
new HashMap<>();

Review comment:
   Yeah I left this comment before syncing with @ableegoldman offline, so 
it's better to clarify for others reading this PR :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13166) EOFException when Controller handles unknown API

2021-08-04 Thread David Arthur (Jira)
David Arthur created KAFKA-13166:


 Summary: EOFException when Controller handles unknown API
 Key: KAFKA-13166
 URL: https://issues.apache.org/jira/browse/KAFKA-13166
 Project: Kafka
  Issue Type: Bug
Reporter: David Arthur
Assignee: David Arthur


When ControllerApis handles an unsupported RPC, it silently drops the request 
due to an unhandled exception. 

The following stack trace was manually printed since this exception was 
suppressed on the controller. 
{code}
java.util.NoSuchElementException: key not found: UpdateFeatures
at scala.collection.MapOps.default(Map.scala:274)
at scala.collection.MapOps.default$(Map.scala:273)
at scala.collection.AbstractMap.default(Map.scala:405)
at scala.collection.mutable.HashMap.apply(HashMap.scala:425)
at kafka.network.RequestChannel$Metrics.apply(RequestChannel.scala:74)
at 
kafka.network.RequestChannel.$anonfun$updateErrorMetrics$1(RequestChannel.scala:458)
at 
kafka.network.RequestChannel.$anonfun$updateErrorMetrics$1$adapted(RequestChannel.scala:457)
at 
kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
at 
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
at 
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
at 
kafka.network.RequestChannel.updateErrorMetrics(RequestChannel.scala:457)
at kafka.network.RequestChannel.sendResponse(RequestChannel.scala:388)
at 
kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:93)
at 
kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:121)
at 
kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:78)
at kafka.server.ControllerApis.handle(ControllerApis.scala:116)
at 
kafka.server.ControllerApis.$anonfun$handleEnvelopeRequest$1(ControllerApis.scala:125)
at 
kafka.server.ControllerApis.$anonfun$handleEnvelopeRequest$1$adapted(ControllerApis.scala:125)
at 
kafka.server.EnvelopeUtils$.handleEnvelopeRequest(EnvelopeUtils.scala:65)
at 
kafka.server.ControllerApis.handleEnvelopeRequest(ControllerApis.scala:125)
at kafka.server.ControllerApis.handle(ControllerApis.scala:103)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
at java.lang.Thread.run(Thread.java:748)
{code}

This is due to a bug in the metrics code in RequestChannel.

The result is that the request fails, but no indication is given that it was 
due to an unsupported API on either the broker, controller, or client.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino commented on a change in pull request #11175: MINOR: Fix getting started documentation

2021-08-04 Thread GitBox


rondagostino commented on a change in pull request #11175:
URL: https://github.com/apache/kafka/pull/11175#discussion_r682834005



##
File path: config/kraft/README.md
##
@@ -14,8 +14,9 @@ Most important of all, KRaft mode is more scalable.  We 
expect to be able to [su
 # Quickstart
 
 ## Warning
-KRaft mode in Kafka 3.0 is provided for testing only, *NOT* for production.  
We do not yet support upgrading existing ZooKeeper-based Kafka clusters into 
this mode.  In fact, when Kafka 3.1 is released,
-it may not be possible to upgrade your KRaft clusters from 3.0 to 3.1.  There 
may be bugs, including serious ones.  You should *assume that your data could 
be lost at any time* if you try the preview release of KRaft mode.
+KRaft mode in Kafka 3.0 is provided for testing only, *NOT* for production.  
We do not yet support upgrading existing ZooKeeper-based Kafka clusters into 
KRaft mode.
+It is not possible to upgrade KRaft clusters from 2.8 to 3.0. Upgrading KRaft 
clusters from 3.0 to 3.1 will be supported. There may be bugs, including 
serious ones.

Review comment:
   > Upgrading KRaft clusters from 3.0 to 3.1 will be supported.
   
   Is that guaranteed to be true?  Would it be better to say something like 
"Upgrading KRaft clusters from 3.0 to 3.1 is likely to be supported, but this 
is not guaranteed."




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr closed pull request #11159: MINOR: Change default node id in kraft broker properties

2021-08-04 Thread GitBox


dielhennr closed pull request #11159:
URL: https://github.com/apache/kafka/pull/11159


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11110: MINOR: move tiered storage related configs to a separate class within LogConfig

2021-08-04 Thread GitBox


junrao commented on a change in pull request #0:
URL: https://github.com/apache/kafka/pull/0#discussion_r682820398



##
File path: core/src/main/scala/kafka/log/LogConfig.scala
##
@@ -107,46 +107,52 @@ case class LogConfig(props: java.util.Map[_, _], 
overriddenConfigs: Set[String]
   val LeaderReplicationThrottledReplicas = 
getList(LogConfig.LeaderReplicationThrottledReplicasProp)
   val FollowerReplicationThrottledReplicas = 
getList(LogConfig.FollowerReplicationThrottledReplicasProp)
   val messageDownConversionEnable = 
getBoolean(LogConfig.MessageDownConversionEnableProp)
-  val remoteStorageEnable = getBoolean(LogConfig.RemoteLogStorageEnableProp)
 
-  val localRetentionMs: Long = {
-val localLogRetentionMs = getLong(LogConfig.LocalLogRetentionMsProp)
+  class TieredLogConfig {

Review comment:
   This probably should be named RemoteLogConfig to match 
RemoteLogManagerConfig?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on pull request #11141: KAFKA-13142: Validate dynamic config alterations prior to forwarding them to the KRaft controller.

2021-08-04 Thread GitBox


dielhennr commented on pull request #11141:
URL: https://github.com/apache/kafka/pull/11141#issuecomment-892837165


   @hachikuji @rajinisivaram since this build is green I’m fairly confident 
that dynamic config behavior hasn’t changed in a zk cluster. I  am wondering if 
it is better to have buggy validation than no validation for KRaft brokers 
(assuming this is still buggy). Do either of you anticipate any problems with 
doing this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11154: KAFKA-13068: Rename Log to UnifiedLog

2021-08-04 Thread GitBox


junrao commented on a change in pull request #11154:
URL: https://github.com/apache/kafka/pull/11154#discussion_r682801773



##
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##
@@ -248,16 +250,16 @@ case object SnapshotGenerated extends 
LogStartOffsetIncrementReason {
  *  will be deleted to avoid ID conflicts upon 
re-upgrade.
  */
 @threadsafe
-class Log(@volatile var logStartOffset: Long,
-  private val localLog: LocalLog,
-  brokerTopicStats: BrokerTopicStats,
-  val producerIdExpirationCheckIntervalMs: Int,
-  @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
-  val producerStateManager: ProducerStateManager,
-  @volatile private var _topicId: Option[Uuid],
-  val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
+class UnifiedLog(@volatile var logStartOffset: Long,
+ private val localLog: LocalLog,
+ brokerTopicStats: BrokerTopicStats,
+ val producerIdExpirationCheckIntervalMs: Int,
+ @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
+ val producerStateManager: ProducerStateManager,
+ @volatile private var _topicId: Option[Uuid],
+ val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
 
-  import kafka.log.Log._
+  import kafka.log.UnifiedLog._

Review comment:
   Should we rename the logging prefix to UnifiedLog too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11150: MINOR: Fix missing word in LogLoader logged warning

2021-08-04 Thread GitBox


ijuma merged pull request #11150:
URL: https://github.com/apache/kafka/pull/11150


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #11168: KAFKA-13160: Fix BrokerMetadataPublisher to pass the correct resource name to the config handler when processing config updates

2021-08-04 Thread GitBox


cmccabe commented on pull request #11168:
URL: https://github.com/apache/kafka/pull/11168#issuecomment-892828330


   Thanks again for the PR. I left another comment.
   
   We need some kind of test for this. The simplest way to do it is probably to 
add an integration test for setting dynamic configs in `KRaftClusterTest.scala`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11168: KAFKA-13160: Fix BrokerMetadataPublisher to pass the correct resource name to the config handler when processing config updates

2021-08-04 Thread GitBox


cmccabe commented on a change in pull request #11168:
URL: https://github.com/apache/kafka/pull/11168#discussion_r682803659



##
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
##
@@ -203,7 +203,15 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
   }
   tag.foreach { t =>
 val newProperties = 
newImage.configs().configProperties(configResource)
-
dynamicConfigHandlers(t).processConfigChanges(configResource.name(), 
newProperties)
+val maybeDefaultName = if (conf.usesSelfManagedQuorum) {

Review comment:
   BrokerMetadataPublisher is only used when in KRaft mode, so this check 
is not necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11137: KAFKA-13133 Replace EasyMock and PowerMock with Mockito for AbstractHerderTest

2021-08-04 Thread GitBox


cmccabe commented on a change in pull request #11137:
URL: https://github.com/apache/kafka/pull/11137#discussion_r682796708



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
##
@@ -135,90 +133,88 @@
 private final int generation = 5;
 private final String connector = "connector";
 private final ConnectorClientConfigOverridePolicy 
noneConnectorClientConfigOverridePolicy = new 
NoneConnectorClientConfigOverridePolicy();
+private Connector insConnector;
 
-@MockStrict private Worker worker;
-@MockStrict private WorkerConfigTransformer transformer;
-@MockStrict private Plugins plugins;
-@MockStrict private ClassLoader classLoader;
-@MockStrict private ConfigBackingStore configStore;
-@MockStrict private StatusBackingStore statusStore;
+final private Worker worker = mock(Worker.class);
+final private WorkerConfigTransformer transformer = 
mock(WorkerConfigTransformer.class);
+final private Plugins plugins = mock(Plugins.class);
+final private ClassLoader classLoader = mock(ClassLoader.class);
+final private ConfigBackingStore configStore = 
mock(ConfigBackingStore.class);
+final private StatusBackingStore statusStore = 
mock(StatusBackingStore.class);
+private ClassLoader loader;
+
+@Before
+public void before() {
+loader = Utils.getContextOrKafkaClassLoader();
+}
+
+@After
+public void tearDown() {
+if (loader != null) Plugins.compareAndSwapLoaders(loader);
+}
 
 @Test
 public void testConnectors() {
-AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-.withConstructor(
-Worker.class,
-String.class,
-String.class,
-StatusBackingStore.class,
-ConfigBackingStore.class,
-ConnectorClientConfigOverridePolicy.class
-)
-.withArgs(worker, workerId, kafkaClusterId, statusStore, 
configStore, noneConnectorClientConfigOverridePolicy)
-.addMockedMethod("generation")
-.createMock();
-
-EasyMock.expect(herder.generation()).andStubReturn(generation);
-EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
-EasyMock.expect(configStore.snapshot()).andReturn(SNAPSHOT);
-replayAll();
+//AbstractHerder herder = partialMockBuilder(AbstractHerder.class)

Review comment:
   it seems like you intended to remove this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11137: KAFKA-13133 Replace EasyMock and PowerMock with Mockito for AbstractHerderTest

2021-08-04 Thread GitBox


cmccabe commented on a change in pull request #11137:
URL: https://github.com/apache/kafka/pull/11137#discussion_r682796708



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
##
@@ -135,90 +133,88 @@
 private final int generation = 5;
 private final String connector = "connector";
 private final ConnectorClientConfigOverridePolicy 
noneConnectorClientConfigOverridePolicy = new 
NoneConnectorClientConfigOverridePolicy();
+private Connector insConnector;
 
-@MockStrict private Worker worker;
-@MockStrict private WorkerConfigTransformer transformer;
-@MockStrict private Plugins plugins;
-@MockStrict private ClassLoader classLoader;
-@MockStrict private ConfigBackingStore configStore;
-@MockStrict private StatusBackingStore statusStore;
+final private Worker worker = mock(Worker.class);
+final private WorkerConfigTransformer transformer = 
mock(WorkerConfigTransformer.class);
+final private Plugins plugins = mock(Plugins.class);
+final private ClassLoader classLoader = mock(ClassLoader.class);
+final private ConfigBackingStore configStore = 
mock(ConfigBackingStore.class);
+final private StatusBackingStore statusStore = 
mock(StatusBackingStore.class);
+private ClassLoader loader;
+
+@Before
+public void before() {
+loader = Utils.getContextOrKafkaClassLoader();
+}
+
+@After
+public void tearDown() {
+if (loader != null) Plugins.compareAndSwapLoaders(loader);
+}
 
 @Test
 public void testConnectors() {
-AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-.withConstructor(
-Worker.class,
-String.class,
-String.class,
-StatusBackingStore.class,
-ConfigBackingStore.class,
-ConnectorClientConfigOverridePolicy.class
-)
-.withArgs(worker, workerId, kafkaClusterId, statusStore, 
configStore, noneConnectorClientConfigOverridePolicy)
-.addMockedMethod("generation")
-.createMock();
-
-EasyMock.expect(herder.generation()).andStubReturn(generation);
-EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
-EasyMock.expect(configStore.snapshot()).andReturn(SNAPSHOT);
-replayAll();
+//AbstractHerder herder = partialMockBuilder(AbstractHerder.class)

Review comment:
   why is this present but commented out?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11130: KAFKA-13138: FileConfigProvider#get should keep failure exception

2021-08-04 Thread GitBox


cmccabe merged pull request #11130:
URL: https://github.com/apache/kafka/pull/11130


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11175: MINOR: Fix getting started documentation

2021-08-04 Thread GitBox


cmccabe commented on a change in pull request #11175:
URL: https://github.com/apache/kafka/pull/11175#discussion_r682789187



##
File path: config/kraft/broker.properties
##
@@ -24,7 +24,7 @@
 process.roles=broker
 
 # The node id associated with this instance's roles
-node.id=1
+node.id=2

Review comment:
   This isn't strictly necessary because the id space for brokers and 
controllers is distinct. I guess it's fine to do this though, to avoid 
confusion?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #11175: MINOR: Fix getting started documentation

2021-08-04 Thread GitBox


cmccabe commented on a change in pull request #11175:
URL: https://github.com/apache/kafka/pull/11175#discussion_r682788755



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1725,7 +1725,14 @@ private long maybeSendRequest(
 return Long.MAX_VALUE;
 }
 
-return connection.remainingRequestTimeMs(currentTimeMs);
+long connectionWaitTimeMs = 
connection.remainingRequestTimeMs(currentTimeMs);
+logger.trace(

Review comment:
   can we use less vertical space here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio opened a new pull request #11175: MINOR: Fix getting started documentation

2021-08-04 Thread GitBox


jsancio opened a new pull request #11175:
URL: https://github.com/apache/kafka/pull/11175


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13165) Validate node id, process role and quorum voters

2021-08-04 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393313#comment-17393313
 ] 

Colin McCabe commented on KAFKA-13165:
--

The node ID space of controllers and brokers is disjoint. So we cannot find out 
anything useful by checking if the broker's node.id is in 
controller.quorum.voters.

Brokers should be observers in the raft quorum, so I don't see how the broker 
becoming a leader could be a problem. Maybe this is a loophole we need to 
close. We should be able to tell the RaftClient whether it is an observer or a 
voter.

> Validate node id, process role and quorum voters
> 
>
> Key: KAFKA-13165
> URL: https://issues.apache.org/jira/browse/KAFKA-13165
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: Jose Armando Garcia Sancio
>Priority: Major
>
> Under certain configuration is possible for the Kafka Server to boot up as a 
> broker only but be the cluster metadata quorum leader. We should validate the 
> configuration to avoid this case.
>  # If the {{process.roles}} contains {{controller}} then the {{node.id}} 
> needs to be in the {{controller.quorum.voters}}
>  # If the {{process.roles}} doesn't contain {{controller}} then the 
> {{node.id}} cannot be in the {{controller.quorum.voters}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-13165) Validate node id, process role and quorum voters

2021-08-04 Thread Colin McCabe (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13165?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393313#comment-17393313
 ] 

Colin McCabe edited comment on KAFKA-13165 at 8/4/21, 4:38 PM:
---

The node ID space of controllers and brokers is disjoint. So we cannot find out 
anything useful by checking if the broker's node.id is in 
controller.quorum.voters.

Brokers should be observers in the raft quorum, so I don't see how the broker 
becoming a leader could possibly happen. Maybe this is a loophole we need to 
close. We should be able to tell the RaftClient whether it is an observer or a 
voter.


was (Author: cmccabe):
The node ID space of controllers and brokers is disjoint. So we cannot find out 
anything useful by checking if the broker's node.id is in 
controller.quorum.voters.

Brokers should be observers in the raft quorum, so I don't see how the broker 
becoming a leader could be a problem. Maybe this is a loophole we need to 
close. We should be able to tell the RaftClient whether it is an observer or a 
voter.

> Validate node id, process role and quorum voters
> 
>
> Key: KAFKA-13165
> URL: https://issues.apache.org/jira/browse/KAFKA-13165
> Project: Kafka
>  Issue Type: Sub-task
>  Components: kraft
>Reporter: Jose Armando Garcia Sancio
>Priority: Major
>
> Under certain configuration is possible for the Kafka Server to boot up as a 
> broker only but be the cluster metadata quorum leader. We should validate the 
> configuration to avoid this case.
>  # If the {{process.roles}} contains {{controller}} then the {{node.id}} 
> needs to be in the {{controller.quorum.voters}}
>  # If the {{process.roles}} doesn't contain {{controller}} then the 
> {{node.id}} cannot be in the {{controller.quorum.voters}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #11159: MINOR: Change default node id in kraft broker properties

2021-08-04 Thread GitBox


jsancio commented on a change in pull request #11159:
URL: https://github.com/apache/kafka/pull/11159#discussion_r682775395



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -552,7 +552,10 @@ class ControllerApis(val requestChannel: RequestChannel,
   def handleBrokerRegistration(request: RequestChannel.Request): Unit = {
 val registrationRequest = request.body[BrokerRegistrationRequest]
 authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
-
+val controllerNode = controllerNodes.find(node => node.id == 
registrationRequest.brokerId)
+if (controllerNode.isDefined) {
+  info(s"A broker is registering with 
${KafkaConfig.NodeIdProp}=${controllerNode.get.id} which is also in use by a 
controller.")
+}

Review comment:
   I would remove this log message. This is a valid configuration and I 
don't think it is providing any additional information.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/BrokerRegistrationRequest.java
##
@@ -57,6 +57,10 @@ public BrokerRegistrationRequestData data() {
 return data;
 }
 
+public int brokerId() {
+return data.brokerId();
+}
+

Review comment:
   I think this pattern undermines the advantages of the message JSON 
schema and the Java code generated from it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah edited a comment on pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft

2021-08-04 Thread GitBox


mumrah edited a comment on pull request #11166:
URL: https://github.com/apache/kafka/pull/11166#issuecomment-892069170


   Streams smoke test runs: 
   * https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4630/
   * https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4631/
   * https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4636/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

2021-08-04 Thread GitBox


jolshan commented on a change in pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#discussion_r682739288



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -313,14 +313,20 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, 
offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = {
-isFutureReplica match {
-  case true if futureLog.isEmpty =>
-val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+val logOpt = if (isFutureReplica) futureLog else log
+if (logOpt.isEmpty) {
+  val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+  if (isFutureReplica)
 this.futureLog = Option(log)
-  case false if log.isEmpty =>
-val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+  else
 this.log = Option(log)
-  case _ => trace(s"${if (isFutureReplica) "Future Log" else "Log"} 
already exists.")
+} else {
+  trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
+  logOpt.foreach { log =>
+if (log.topicId.isEmpty) {

Review comment:
   I'm a little unsure what you mean by topicId is consistent here. (There 
would have been no log/topic ID in log to compare to). We can add an extra 
check in assign topic ID to ensure there is no way to assign inconsistently.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13165) Validate node id, process role and quorum voters

2021-08-04 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13165:
--

 Summary: Validate node id, process role and quorum voters
 Key: KAFKA-13165
 URL: https://issues.apache.org/jira/browse/KAFKA-13165
 Project: Kafka
  Issue Type: Sub-task
  Components: kraft
Reporter: Jose Armando Garcia Sancio


Under certain configuration is possible for the Kafka Server to boot up as a 
broker only but be the cluster metadata quorum leader. We should validate the 
configuration to avoid this case.
 # If the {{process.roles}} contains {{controller}} then the {{node.id}} needs 
to be in the {{controller.quorum.voters}}
 # If the {{process.roles}} doesn't contain {{controller}} then the {{node.id}} 
cannot be in the {{controller.quorum.voters}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2021-08-04 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo updated KAFKA-10413:
---
Description: 
GHi,

With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, if 
a connect instance disappear, or a new one appear, we're seeing unbalanced 
consumption, much like mentionned in this post:

[https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]


This usually leads to one kafka connect instance taking most of the load and 
consumption not being able to keep on.
Currently, we're "fixing" this by deleting the connector and re-creating it, 
but this is far from ideal.

Any suggestion on what we could do to mitigate this ?

  was:
Hi,

With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, if 
a connect instance disappear, or a new one appear, we're seeing unbalanced 
consumption, much like mentionned in this post:

[https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]


This usually leads to one kafka connect instance taking most of the load and 
consumption not being able to keep on.
Currently, we're "fixing" this by deleting the connector and re-creating it, 
but this is far from ideal.

Any suggestion on what we could do to mitigate this ?


> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Assignee: rameshkrishnan muthusamy
>Priority: Major
> Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
> Attachments: connect_worker_balanced.png
>
>
> GHi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-10413) rebalancing leads to unevenly balanced connectors

2021-08-04 Thread yazgoo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yazgoo reopened KAFKA-10413:


Hi,

I'm reopening this because I'm still seeing this issue with CP 6.2.0 which 
ships with 2.8.0 which is marked as a fixed version.
We basically see the same behavior as mentioned in the issue description.

> rebalancing leads to unevenly balanced connectors
> -
>
> Key: KAFKA-10413
> URL: https://issues.apache.org/jira/browse/KAFKA-10413
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1
>Reporter: yazgoo
>Assignee: rameshkrishnan muthusamy
>Priority: Major
> Fix For: 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
> Attachments: connect_worker_balanced.png
>
>
> Hi,
> With CP 5.5, running kafka connect s3 sink on EC2 whith autoscaling enabled, 
> if a connect instance disappear, or a new one appear, we're seeing unbalanced 
> consumption, much like mentionned in this post:
> [https://stackoverflow.com/questions/58644622/incremental-cooperative-rebalancing-leads-to-unevenly-balanced-connectors]
> This usually leads to one kafka connect instance taking most of the load and 
> consumption not being able to keep on.
> Currently, we're "fixing" this by deleting the connector and re-creating it, 
> but this is far from ideal.
> Any suggestion on what we could do to mitigate this ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei edited a comment on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-04 Thread GitBox


vvcephei edited a comment on pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#issuecomment-892354449


   I went ahead and kicked off a full system test build to evaluate the other 
tests that got changed in this PR: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4633/console
   
   EDIT: Results: 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2021-08-04--001.system-test-kafka-branch-builder--1628079409--kamalcph--KAFKA-12724--fbdce206b/report.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei edited a comment on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-04 Thread GitBox


vvcephei edited a comment on pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#issuecomment-892707889


   There were a number of failures in the prior run. Some were timeouts, and 
some of the Streams failures were due to the HA assignor giving only standbys 
to some nodes. I spot-checked by re-running a few of the tests locally and 
confirmed that they were flaky failures.
   
   I re-ran the tests 
(https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4635/)
   * with "spot instance" disabled
   * having added a change to the Streams tests intended to disable the HA 
assignor logic in favor of a one-shot balanced assignment.
   
   I'll keep an eye on that test run to make sure it's clean.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-04 Thread GitBox


vvcephei commented on pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#issuecomment-892707889


   There were a number of failures in the prior run. Some were timeouts, and 
some of the Streams failures were due to the HA assignor giving only standbys 
to some nodes.
   
   I re-ran the tests 
(https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4635/)
   * with "spot instance" disabled
   * having added a change to the Streams tests intended to disable the HA 
assignor logic in favor of a one-shot balanced assignment.
   
   I'll keep an eye on that test run to make sure it's clean.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft

2021-08-04 Thread GitBox


mumrah commented on a change in pull request #11166:
URL: https://github.com/apache/kafka/pull/11166#discussion_r682654917



##
File path: tests/kafkatest/tests/core/replication_test.py
##
@@ -122,14 +122,16 @@ def min_cluster_size(self):
 @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
 broker_type=["leader"],
 security_protocol=["PLAINTEXT"],
-enable_idempotence=[True])
+enable_idempotence=[True],
+metadata_quorum=quorum.all_non_upgrade)
 @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
 broker_type=["leader"],
 security_protocol=["PLAINTEXT", "SASL_SSL"],
 metadata_quorum=quorum.all_non_upgrade)
 @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", 
"hard_bounce"],
 broker_type=["controller"],
-security_protocol=["PLAINTEXT", "SASL_SSL"])
+security_protocol=["PLAINTEXT", "SASL_SSL"],
+metadata_quorum=quorum.all_non_upgrade)

Review comment:
   Good catch @rondagostino. Looks like this was here as a pre-check to 
make sure the matrix args were correct. I'll fix this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah edited a comment on pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft

2021-08-04 Thread GitBox


mumrah edited a comment on pull request #11166:
URL: https://github.com/apache/kafka/pull/11166#issuecomment-892069170


   Streams smoke test runs: 
   * https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4630/
   * https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4631/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah edited a comment on pull request #11166: KAFKA-13159 Enable additional transaction system tests in KRaft

2021-08-04 Thread GitBox


mumrah edited a comment on pull request #11166:
URL: https://github.com/apache/kafka/pull/11166#issuecomment-892067696


   Replication test runs: 
   * https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4627/
   * https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4634/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-04 Thread GitBox


vvcephei commented on pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#issuecomment-892677574


   Thanks @kamalcph . Sorry it took me so long to look into it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-08-04 Thread GitBox


vvcephei commented on a change in pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#discussion_r682637061



##
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##
@@ -25,15 +25,17 @@
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.streams.utils import extract_generation_from_logs, 
extract_generation_id
 from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
-LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
LATEST_2_6, LATEST_2_7, DEV_BRANCH, DEV_VERSION, KafkaVersion
+LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
LATEST_2_6, LATEST_2_7, LATEST_2_8, DEV_BRANCH, DEV_VERSION, KafkaVersion
 
 # broker 0.10.0 is not compatible with newer Kafka Streams versions
 broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), \
str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), 
str(LATEST_2_3), \
-   str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(DEV_BRANCH)]
+   str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(LATEST_2_8), str(DEV_BRANCH)]
 
 metadata_1_versions = [str(LATEST_0_10_0)]
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
+metadata_3_10_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), 
str(LATEST_2_3), str(LATEST_2_4),

Review comment:
   It's probably a typo, but it didn't matter, since this was the wrong 
test to add new versions to, anyway. I backed out these changes and added the 
missing versions to streams_application_upgrade_test.py.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] axrj commented on pull request #8906: KAFKA-10190: To set replication throttling configs at broker entity-default

2021-08-04 Thread GitBox


axrj commented on pull request #8906:
URL: https://github.com/apache/kafka/pull/8906#issuecomment-892663172


   Hey,
   
   Is there any plan to merge this? It would be great to have this patch 
backported too if possible.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] akatona84 commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely

2021-08-04 Thread GitBox


akatona84 commented on pull request #11174:
URL: https://github.com/apache/kafka/pull/11174#issuecomment-892638494


   I'm not sure who puts these lines in the final commit message, but pls make 
sure that put Daniel in it too:
   ```
   Co-authored-by: Daniel Urban 
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13164) State store is attached to wrong node in the Kafka Streams topology

2021-08-04 Thread Ralph Matthias Debusmann (Jira)
Ralph Matthias Debusmann created KAFKA-13164:


 Summary: State store is attached to wrong node in the Kafka 
Streams topology
 Key: KAFKA-13164
 URL: https://issues.apache.org/jira/browse/KAFKA-13164
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.8.0
 Environment: local development (MacOS Big Sur 11.4)
Reporter: Ralph Matthias Debusmann
 Attachments: 1.jpg, 3.jpg

Hi,

mjsax and me noticed a bug where a state store is attached to the wrong node in 
the Kafka Streams topology.

The issue arised when I tried to read a topic into a KTable, then continued 
with a mapValues(), and then joined this KTable with a KStream, like so:
 
var kTable = this.streamsBuilder.table().mapValues();
 
and then later:
 
var joinedKStream = kstream.leftJoin(kTable, );
 
The join didn't work, and neither did it work when I added Materialized.as() to 
mapValues(), like so:
var kTable = this.streamsBuilder.table().mapValues(, 
*Materialized.as()*);
 
 Interestingly, I could get the join to work, when I first read the topic into 
a *KStream*, then continued with the mapValues(), then turned the KStream into 
a KTable, and then joined the KTable with the other KStream, like so:
 
var kTable = this.streamsBuilder.stream().mapValues().toTable();
 
(the join worked the same as above)
 
When mjsax and me had a look on the topology, we could see that in the former, 
not working code, the state store (required for the join) is attached to the 
pre-final "KTABLE-SOURCE", and not the final "KTABLE-MAPVALUES" node (see 
attachment "1.jpg"). In the working code, the state store is (correctly) 
attached to the final "KSTREAM-TOTABLE" node (see attachment "3.jpg").
 
Best regards,
xdgrulez
 
 
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] akatona84 commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely

2021-08-04 Thread GitBox


akatona84 commented on pull request #11174:
URL: https://github.com/apache/kafka/pull/11174#issuecomment-892623567


   @kkonstantine , @rhauch could you take a look on this minor fix with big 
impact? Thx!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10719) MirrorMaker2 fails to update its runtime configuration

2021-08-04 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393104#comment-17393104
 ] 

Daniel Urban commented on KAFKA-10719:
--

I believe that the core issue here is that the connectors will start up with 
the old config first, and then the new config gets applied by MM2.

If the old listener is not available, that will cause timeouts in the 
rebalance. Connectors have a default timeout which exceeds the default 
rebalance timeout. When the rebalance timeout happens, none of the MM2 nodes 
will be able to update the config after the startup.

Besides deleting the config topic, you can also try to increase the rebalance 
timeout of the Connect workers running inside MM2 (to something around 3 
minutes), which will allow the cluster to not hit a rebalance timeout, and then 
successfully apply the new configs.

> MirrorMaker2 fails to update its runtime configuration
> --
>
> Key: KAFKA-10719
> URL: https://issues.apache.org/jira/browse/KAFKA-10719
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.0
>Reporter: Peter Sinoros-Szabo
>Priority: Major
>
> I was running successfully the MM2 cluster with the following configuration, 
> I simplified it a little: {code:java} clusters = main, backup 
> main.bootstrap.servers = kafkaA:9202,kafkaB:9092,kafkaB:9092 
> backup.bootstrap.servers = backupA:9092,backupB:9092,backupC:9092 
> main->backup.enabled = true main->backup.topics = .*{code} I wanted to change 
> the bootstrap.address list of the destination cluster to a different list 
> that is pointing to the *same* cluster, just a different listener with a 
> different routing. So I changed it to: {code:java} backup.bootstrap.servers = 
> backupSecA:1234,backupSecB:1234,backupSecC:1234{code} I did a rolling restart 
> on the MM2 nodes and say that some tasks were still using the old bootstrap 
> addresses, some of them were using the new one. I don't have the logs, so 
> unfortunately I don't know which one picked up the good values and which 
> didn't. I even stopped the cluster completely, but it didn't help. Ryanne 
> adviced to delete the mm2-config and mm2-status topics, so I did delete those 
> on the destination cluster, that solved this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9981) Running a dedicated mm2 cluster with more than one nodes,When the configuration is updated the task is not aware and will lose the update operation.

2021-08-04 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393101#comment-17393101
 ] 

Daniel Urban commented on KAFKA-9981:
-

This KIP aims to fix the issue by adding the REST API to MM2, and also 
improving the config provider reference handling in the MM2 configs: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-710%3A+Full+support+for+distributed+mode+in+dedicated+MirrorMaker+2.0+clusters]

> Running a dedicated mm2 cluster with more than one nodes,When the 
> configuration is updated the task is not aware and will lose the update 
> operation.
> 
>
> Key: KAFKA-9981
> URL: https://issues.apache.org/jira/browse/KAFKA-9981
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.5.0, 2.4.1
>Reporter: victor
>Priority: Major
>
> DistributedHerder.reconfigureConnector induction config update as follows:
> {code:java}
> if (changed) {
> List> rawTaskProps = reverseTransform(connName, 
> configState, taskProps);
> if (isLeader()) {
> configBackingStore.putTaskConfigs(connName, rawTaskProps);
> cb.onCompletion(null, null);
> } else {
> // We cannot forward the request on the same thread because this 
> reconfiguration can happen as a result of connector
> // addition or removal. If we blocked waiting for the response from 
> leader, we may be kicked out of the worker group.
> forwardRequestExecutor.submit(new Runnable() {
> @Override
> public void run() {
> try {
> String leaderUrl = leaderUrl();
> if (leaderUrl == null || leaderUrl.trim().isEmpty()) {
> cb.onCompletion(new ConnectException("Request to 
> leader to " +
> "reconfigure connector tasks failed " +
> "because the URL of the leader's REST 
> interface is empty!"), null);
> return;
> }
> String reconfigUrl = RestServer.urlJoin(leaderUrl, 
> "/connectors/" + connName + "/tasks");
> log.trace("Forwarding task configurations for connector 
> {} to leader", connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, 
> rawTaskProps, null, config, sessionKey, requestSignatureAlgorithm);
> cb.onCompletion(null, null);
> } catch (ConnectException e) {
> log.error("Request to leader to reconfigure connector 
> tasks failed", e);
> cb.onCompletion(e, null);
> }
> }
> });
> }
> }
> {code}
> KafkaConfigBackingStore task checks for configuration updates,such as topic 
> whitelist update.If KafkaConfigBackingStore task is not running on leader 
> node,an HTTP request will be send to notify the leader of the configuration 
> update.However,dedicated mm2 cluster does not have the HTTP server turned 
> on,so the request will fail to be sent,causing the update operation to be 
> lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9805) Running MirrorMaker in a Connect cluster,but the task not running

2021-08-04 Thread Andras Katona (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andras Katona resolved KAFKA-9805.
--
Resolution: Duplicate

> Running MirrorMaker in a Connect cluster,but the task not running
> -
>
> Key: KAFKA-9805
> URL: https://issues.apache.org/jira/browse/KAFKA-9805
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.1
> Environment: linux
>Reporter: ZHAO GH
>Priority: Major
>   Original Estimate: 5h
>  Remaining Estimate: 5h
>
> when i am Running MirrorMaker in a Connect clusterwhen i am Running 
> MirrorMaker in a Connect cluster,sometime the task running,but sometime the 
> task cannot assignment。
> I post connector config to connect cluster,here is my config
> http://99.12.98.33:8083/connectorshttp://99.12.98.33:8083/connectors
> {    "name": "kafka->kafka241-3",   
> "config": {       
>     "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",     
>     "topics": "MM2-3",     
>     "key.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter",                     
>   "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter",     
> "tasks.max": 8,     
> "sasl.mechanism": "PLAIN",   
>  "security.protocol": "SASL_PLAINTEXT",   
>  "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule 
> required username=\"admin\" password=\"admin\";",          
> "source.cluster.alias": "kafka",     "source.cluster.bootstrap.servers": 
> "55.13.104.70:9092,55.13.104.74:9092,55.13.104.126:9092",     
> "source.admin.sasl.mechanism": "PLAIN",     
> "source.admin.security.protocol": "SASL_PLAINTEXT",     
> "source.admin.sasl.jaas.config": 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=\"admin\" password=\"admin\";",         
> "target.cluster.alias": "kafka241",   
>  "target.cluster.bootstrap.servers": 
> "55.14.111.22:9092,55.14.111.23:9092,55.14.111.25:9092",     
> "target.admin.sasl.mechanism": "PLAIN",     
> "target.admin.security.protocol": "SASL_PLAINTEXT",   
> "target.admin.sasl.jaas.config": 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=\"admin\" password=\"admin\";",       
>  "producer.sasl.mechanism": "PLAIN",   
>  "producer.security.protocol": "SASL_PLAINTEXT",     
> "producer.sasl.jaas.config": 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=\"admin\" password=\"admin\";",          
> "consumer.sasl.mechanism": "PLAIN",   
>  "consumer.security.protocol": "SASL_PLAINTEXT",   
>  "consumer.sasl.jaas.config": 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=\"admin\" password=\"admin\";",     
> "consumer.group.id": "mm2-1"       
> }
> }
>  
> but I get the connector status,found not tasks running
> http://99.12.98.33:8083/connectors/kafka->kafka241-3/status
> {
>  "name": "kafka->kafka241-3",
>  "connector": {
>  "state": "RUNNING",
>  "worker_id": "99.12.98.34:8083"
>  },
>  "tasks": [],
>  "type": "source"
> }
>  
> but sometime,the task run success
> http://99.12.98.33:8083/connectors/kafka->kafka241-1/status
> {
>  "name": "kafka->kafka241-1",
>  "connector": {
>  "state": "RUNNING",
>  "worker_id": "99.12.98.34:8083"
>  },
>  "tasks": [
>  {
>  "id": 0,
>  "state": "RUNNING",
>  "worker_id": "99.12.98.34:8083"
>  },
>  {
>  "id": 1,
>  "state": "RUNNING",
>  "worker_id": "99.12.98.33:8083"
>  },
>  {
>  "id": 2,
>  "state": "RUNNING",
>  "worker_id": "99.12.98.34:8083"
>  }
>  ],
>  "type": "source"
> }
> is somebody met this problem? how to fix it,is it a bug?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] akatona84 commented on pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely

2021-08-04 Thread GitBox


akatona84 commented on pull request #11174:
URL: https://github.com/apache/kafka/pull/11174#issuecomment-892618088


   Thanks @urbandan for investigating this issue with me and creating the fix. 
Could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] akatona84 opened a new pull request #11174: KAFKA-9747: Creating connect reconfiguration URL safely

2021-08-04 Thread GitBox


akatona84 opened a new pull request #11174:
URL: https://github.com/apache/kafka/pull/11174


   * URL wasn't urlencoded when forwarded reconfiguration to leader connect 
worker
   * handling previously swallowed errors in connect RestClient
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9805) Running MirrorMaker in a Connect cluster,but the task not running

2021-08-04 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393080#comment-17393080
 ] 

Daniel Urban commented on KAFKA-9805:
-

I believe this is the same issue as KAFKA-9747, see the answer from [~akatona] 
about the issue.

> Running MirrorMaker in a Connect cluster,but the task not running
> -
>
> Key: KAFKA-9805
> URL: https://issues.apache.org/jira/browse/KAFKA-9805
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.1
> Environment: linux
>Reporter: ZHAO GH
>Priority: Major
>   Original Estimate: 5h
>  Remaining Estimate: 5h
>
> when i am Running MirrorMaker in a Connect clusterwhen i am Running 
> MirrorMaker in a Connect cluster,sometime the task running,but sometime the 
> task cannot assignment。
> I post connector config to connect cluster,here is my config
> http://99.12.98.33:8083/connectorshttp://99.12.98.33:8083/connectors
> {    "name": "kafka->kafka241-3",   
> "config": {       
>     "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",     
>     "topics": "MM2-3",     
>     "key.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter",                     
>   "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter",     
> "tasks.max": 8,     
> "sasl.mechanism": "PLAIN",   
>  "security.protocol": "SASL_PLAINTEXT",   
>  "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule 
> required username=\"admin\" password=\"admin\";",          
> "source.cluster.alias": "kafka",     "source.cluster.bootstrap.servers": 
> "55.13.104.70:9092,55.13.104.74:9092,55.13.104.126:9092",     
> "source.admin.sasl.mechanism": "PLAIN",     
> "source.admin.security.protocol": "SASL_PLAINTEXT",     
> "source.admin.sasl.jaas.config": 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=\"admin\" password=\"admin\";",         
> "target.cluster.alias": "kafka241",   
>  "target.cluster.bootstrap.servers": 
> "55.14.111.22:9092,55.14.111.23:9092,55.14.111.25:9092",     
> "target.admin.sasl.mechanism": "PLAIN",     
> "target.admin.security.protocol": "SASL_PLAINTEXT",   
> "target.admin.sasl.jaas.config": 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=\"admin\" password=\"admin\";",       
>  "producer.sasl.mechanism": "PLAIN",   
>  "producer.security.protocol": "SASL_PLAINTEXT",     
> "producer.sasl.jaas.config": 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=\"admin\" password=\"admin\";",          
> "consumer.sasl.mechanism": "PLAIN",   
>  "consumer.security.protocol": "SASL_PLAINTEXT",   
>  "consumer.sasl.jaas.config": 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=\"admin\" password=\"admin\";",     
> "consumer.group.id": "mm2-1"       
> }
> }
>  
> but I get the connector status,found not tasks running
> http://99.12.98.33:8083/connectors/kafka->kafka241-3/status
> {
>  "name": "kafka->kafka241-3",
>  "connector": {
>  "state": "RUNNING",
>  "worker_id": "99.12.98.34:8083"
>  },
>  "tasks": [],
>  "type": "source"
> }
>  
> but sometime,the task run success
> http://99.12.98.33:8083/connectors/kafka->kafka241-1/status
> {
>  "name": "kafka->kafka241-1",
>  "connector": {
>  "state": "RUNNING",
>  "worker_id": "99.12.98.34:8083"
>  },
>  "tasks": [
>  {
>  "id": 0,
>  "state": "RUNNING",
>  "worker_id": "99.12.98.34:8083"
>  },
>  {
>  "id": 1,
>  "state": "RUNNING",
>  "worker_id": "99.12.98.33:8083"
>  },
>  {
>  "id": 2,
>  "state": "RUNNING",
>  "worker_id": "99.12.98.34:8083"
>  }
>  ],
>  "type": "source"
> }
> is somebody met this problem? how to fix it,is it a bug?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9747) No tasks created for a connector

2021-08-04 Thread Andras Katona (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17393078#comment-17393078
 ] 

Andras Katona commented on KAFKA-9747:
--

The connect name contains a character which is considered as illegal char via 
HttpClient::newRequest
{noformat}
java.lang.IllegalArgumentException: Illegal character in path at index ..
at java.net.URI.create(URI.java:852)
at org.eclipse.jetty.client.HttpClient.newRequest(HttpClient.java:453)
...
Caused by: java.net.URISyntaxException: Illegal character in path at index 
...
at java.net.URI$Parser.fail(URI.java:2848)
at java.net.URI$Parser.checkChars(URI.java:3021)
{noformat}

> No tasks created for a connector
> 
>
> Key: KAFKA-9747
> URL: https://issues.apache.org/jira/browse/KAFKA-9747
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0
> Environment: OS: Ubuntu 18.04 LTS
> Platform: Confluent Platform 5.4
> HW: The same behaviour on various AWS instances - from t3.small to c5.xlarge
>Reporter: Vit Koma
>Assignee: Andras Katona
>Priority: Major
> Attachments: connect-distributed.properties, connect.log
>
>
> We are running Kafka Connect in a distributed mode on 3 nodes using Debezium 
> (MongoDB) and Confluent S3 connectors. When adding a new connector via the 
> REST API the connector is created in RUNNING state, but no tasks are created 
> for the connector.
> Pausing and resuming the connector does not help. When we stop all workers 
> and then start them again, the tasks are created and everything runs as it 
> should.
> The issue does not show up if we run only a single node.
> The issue is not caused by the connector plugins, because we see the same 
> behaviour for both Debezium and S3 connectors. Also in debug logs I can see 
> that Debezium is correctly returning a task configuration from the 
> Connector.taskConfigs() method.
> Connector configuration examples
> Debezium:
> {code}
> {
>   "name": "qa-mongodb-comp-converter-task|1",
>   "config": {
> "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
> "mongodb.hosts": 
> "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017",
> "mongodb.name": "qa-debezium-comp",
> "mongodb.ssl.enabled": true,
> "collection.whitelist": "converter[.]task",
> "tombstones.on.delete": true
>   }
> }
> {code}
> S3 Connector:
> {code}
> {
>   "name": "qa-s3-sink-task|1",
>   "config": {
> "connector.class": "io.confluent.connect.s3.S3SinkConnector",
> "topics": "qa-debezium-comp.converter.task",
> "topics.dir": "data/env/qa",
> "s3.region": "eu-west-1",
> "s3.bucket.name": "",
> "flush.size": "15000",
> "rotate.interval.ms": "360",
> "storage.class": "io.confluent.connect.s3.storage.S3Storage",
> "format.class": 
> "custom.kafka.connect.s3.format.plaintext.PlaintextFormat",
> "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.DefaultPartitioner",
> "schema.compatibility": "NONE",
> "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> "key.converter.schemas.enable": false,
> "value.converter.schemas.enable": false,
> "transforms": "ExtractDocument",
> 
> "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value"
>   }
> }
> {code}
> The connectors are created using curl: {{curl -X POST -H "Content-Type: 
> application/json" --data @ http:/:10083/connectors}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9747) No tasks created for a connector

2021-08-04 Thread Andras Katona (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andras Katona reassigned KAFKA-9747:


Assignee: Andras Katona

> No tasks created for a connector
> 
>
> Key: KAFKA-9747
> URL: https://issues.apache.org/jira/browse/KAFKA-9747
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0
> Environment: OS: Ubuntu 18.04 LTS
> Platform: Confluent Platform 5.4
> HW: The same behaviour on various AWS instances - from t3.small to c5.xlarge
>Reporter: Vit Koma
>Assignee: Andras Katona
>Priority: Major
> Attachments: connect-distributed.properties, connect.log
>
>
> We are running Kafka Connect in a distributed mode on 3 nodes using Debezium 
> (MongoDB) and Confluent S3 connectors. When adding a new connector via the 
> REST API the connector is created in RUNNING state, but no tasks are created 
> for the connector.
> Pausing and resuming the connector does not help. When we stop all workers 
> and then start them again, the tasks are created and everything runs as it 
> should.
> The issue does not show up if we run only a single node.
> The issue is not caused by the connector plugins, because we see the same 
> behaviour for both Debezium and S3 connectors. Also in debug logs I can see 
> that Debezium is correctly returning a task configuration from the 
> Connector.taskConfigs() method.
> Connector configuration examples
> Debezium:
> {code}
> {
>   "name": "qa-mongodb-comp-converter-task|1",
>   "config": {
> "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
> "mongodb.hosts": 
> "mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017",
> "mongodb.name": "qa-debezium-comp",
> "mongodb.ssl.enabled": true,
> "collection.whitelist": "converter[.]task",
> "tombstones.on.delete": true
>   }
> }
> {code}
> S3 Connector:
> {code}
> {
>   "name": "qa-s3-sink-task|1",
>   "config": {
> "connector.class": "io.confluent.connect.s3.S3SinkConnector",
> "topics": "qa-debezium-comp.converter.task",
> "topics.dir": "data/env/qa",
> "s3.region": "eu-west-1",
> "s3.bucket.name": "",
> "flush.size": "15000",
> "rotate.interval.ms": "360",
> "storage.class": "io.confluent.connect.s3.storage.S3Storage",
> "format.class": 
> "custom.kafka.connect.s3.format.plaintext.PlaintextFormat",
> "schema.generator.class": 
> "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
> "partitioner.class": 
> "io.confluent.connect.storage.partitioner.DefaultPartitioner",
> "schema.compatibility": "NONE",
> "key.converter": "org.apache.kafka.connect.json.JsonConverter",
> "value.converter": "org.apache.kafka.connect.json.JsonConverter",
> "key.converter.schemas.enable": false,
> "value.converter.schemas.enable": false,
> "transforms": "ExtractDocument",
> 
> "transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value"
>   }
> }
> {code}
> The connectors are created using curl: {{curl -X POST -H "Content-Type: 
> application/json" --data @ http:/:10083/connectors}}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9747) No tasks created for a connector

2021-08-04 Thread Andras Katona (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andras Katona updated KAFKA-9747:
-
Description: 
We are running Kafka Connect in a distributed mode on 3 nodes using Debezium 
(MongoDB) and Confluent S3 connectors. When adding a new connector via the REST 
API the connector is created in RUNNING state, but no tasks are created for the 
connector.

Pausing and resuming the connector does not help. When we stop all workers and 
then start them again, the tasks are created and everything runs as it should.

The issue does not show up if we run only a single node.

The issue is not caused by the connector plugins, because we see the same 
behaviour for both Debezium and S3 connectors. Also in debug logs I can see 
that Debezium is correctly returning a task configuration from the 
Connector.taskConfigs() method.

Connector configuration examples

Debezium:
{code}
{
  "name": "qa-mongodb-comp-converter-task|1",
  "config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": 
"mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017",
"mongodb.name": "qa-debezium-comp",
"mongodb.ssl.enabled": true,
"collection.whitelist": "converter[.]task",
"tombstones.on.delete": true
  }
}
{code}
S3 Connector:
{code}
{
  "name": "qa-s3-sink-task|1",
  "config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "qa-debezium-comp.converter.task",
"topics.dir": "data/env/qa",
"s3.region": "eu-west-1",
"s3.bucket.name": "",
"flush.size": "15000",
"rotate.interval.ms": "360",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "custom.kafka.connect.s3.format.plaintext.PlaintextFormat",
"schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": 
"io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"transforms": "ExtractDocument",

"transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value"
  }
}
{code}
The connectors are created using curl: {{curl -X POST -H "Content-Type: 
application/json" --data @ http:/:10083/connectors}}



  was:
We are running Kafka Connect in a distributed mode on 3 nodes using Debezium 
(MongoDB) and Confluent S3 connectors. When adding a new connector via the REST 
API the connector is created in RUNNING state, but no tasks are created for the 
connector.

Pausing and resuming the connector does not help. When we stop all workers and 
then start them again, the tasks are created and everything runs as it should.

The issue does not show up if we run only a single node.

The issue is not caused by the connector plugins, because we see the same 
behaviour for both Debezium and S3 connectors. Also in debug logs I can see 
that Debezium is correctly returning a task configuration from the 
Connector.taskConfigs() method.

Connector configuration examples

Debezium:

{
  "name": "qa-mongodb-comp-converter-task|1",
  "config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": 
"mongodb-qa-001:27017,mongodb-qa-002:27017,mongodb-qa-003:27017",
"mongodb.name": "qa-debezium-comp",
"mongodb.ssl.enabled": true,
"collection.whitelist": "converter[.]task",
"tombstones.on.delete": true
  }
}

S3 Connector:

{
  "name": "qa-s3-sink-task|1",
  "config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "qa-debezium-comp.converter.task",
"topics.dir": "data/env/qa",
"s3.region": "eu-west-1",
"s3.bucket.name": "",
"flush.size": "15000",
"rotate.interval.ms": "360",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "custom.kafka.connect.s3.format.plaintext.PlaintextFormat",
"schema.generator.class": 
"io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": 
"io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter.schemas.enable": false,
"transforms": "ExtractDocument",

"transforms.ExtractDocument.type":"custom.kafka.connect.transforms.ExtractDocument$Value"
  }
}

The connectors are created using curl: {{curl -X POST -H "Content-Type: 
application/json" --data @ http:/:10083/connectors}}




> No tasks created for a connector
> 

[jira] [Created] (KAFKA-13163) Issue with MySql worker sink connector

2021-08-04 Thread Muddam Pullaiah Yadav (Jira)
Muddam Pullaiah Yadav created KAFKA-13163:
-

 Summary: Issue with MySql worker sink connector
 Key: KAFKA-13163
 URL: https://issues.apache.org/jira/browse/KAFKA-13163
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.1.1
 Environment: PreProd
Reporter: Muddam Pullaiah Yadav


Please help with the following issue. Really appreciate it! 

 

We are using Azure HDInsight Kafka cluster 

My sink Properties:

 

cat mysql-sink-connector
 {
 "name":"mysql-sink-connector",
 "config":

{ "tasks.max":"2", "batch.size":"1000", "batch.max.rows":"1000", 
"poll.interval.ms":"500", 
"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector", 
"connection.url":"jdbc:mysql://moddevdb.mysql.database.azure.com:3306/db_grab_dev",
 "table.name":"db_grab_dev.tbl_clients_merchants", "topics":"test", 
"connection.user":"grabmod", "connection.password":"#admin", 
"auto.create":"true", "auto.evolve":"true", 
"value.converter":"org.apache.kafka.connect.json.JsonConverter", 
"value.converter.schemas.enable":"false", 
"key.converter":"org.apache.kafka.connect.json.JsonConverter", 
"key.converter.schemas.enable":"true" }

}

 

[2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask:177)
 org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
handler
 at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
 at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
 Caused by: org.apache.kafka.connect.errors.DataException: Unknown schema type: 
null
 at 
org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:743)
 at 
org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:363)
 at 
org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
 at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
 at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
 ... 13 more
 [2021-08-04 11:18:30,234] ERROR WorkerSinkTask\{id=mysql-sink-connector-0} 
Task is being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask:178)
 [2021-08-04 11:18:30,235] INFO [Consumer clientId=consumer-18, 
groupId=connect-mysql-sink-connector] Sending LeaveGroup request to coordinator 
wn2-grabde.fkgw2p1emuqu5d21xcbqrhqqbf.rx.internal.cloudapp.net:9092 (id: 
2147482646 rack: null) 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:782)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jlprat commented on pull request #10784: KAFKA-12862: Update Scala fmt library and apply fixes

2021-08-04 Thread GitBox


jlprat commented on pull request #10784:
URL: https://github.com/apache/kafka/pull/10784#issuecomment-892521275


   Pinging @vvcephei
   Would you be able to merger this before more commits get in the way and we 
need another rebase?
   Thanks a ton!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12468) Initial offsets are copied from source to target cluster

2021-08-04 Thread Alexis Josephides (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17392914#comment-17392914
 ] 

Alexis Josephides commented on KAFKA-12468:
---

Thanks for the suggestions and apologies for the delay in updating how we 
handled this issue in the end.
Should say from the outset that we did not completely remove this issue but we 
minimised the occurrences, fixed some and in the remainder - lived with it.
The first step was minimisation. We achieved this via the phasing of turning on 
our connectors. The first connector we applied was the `Source` connector. For 
our setup we had a number of source connectors - some set to replicate from 
`latest` and others from `earliest`. We let this connector run and replicate 
until we hit a steady state and all replication was confirmed to be at the head 
of their relevant topic. This soak could be a few days depending on your data 
volumes, throughputs (client limits) etc.
Once the soak has completed we then turned on the Checkpoint connector.

If there are negative offsets after this first step we then took steps to 
manage them. There are 2 categories here. Partitions that have data on them and 
partitions that have no data on them.
In the first instance (data on partitions) the first thing we try is to 
`delete` the affected consumer group. This is absolutely fine to do as a) no 
consumers on the target cluster yet, b) the group is replicated again by MM2.
In 90% of instances the negative offset was corrected.

In the second instance (no data on partitions) the first thing we examined is 
whether we could publish data (on source cluster) onto the topic to put data 
onto the partition. This was then followed by a refresh (delete) of the 
affected consumer group. This was possible only if the downstream consumer 
handled either dummy garbage messages ok or was fine with a small number of 
duplicate messages.

What if following the above a negative offset remained?
In the instance where there was zero data on a partition and no new data could 
be published to it we let the consumer migrate onto the target cluster without 
much worry. The Kafka consumer behaviour at this point would look at a negative 
offset and throw a warning that it was out of range. It would then reset it's 
offset on the cluster to its default setting - either consumer from `latest` or 
`earliest`. Since there is 0 data on that partition this is one and the same 
thing.

For instances (rare but did occur) where there remained a negative offset and 
data on the partition we still migrated and relied on the consumer behaviour to 
reset its offset to either `earliest` or `latest`. Depending on the consumer 
and it's use case we picked whichever best suited the scenario.

Hope this is helpful in some way to others that might be experiencing these 
issues.

> Initial offsets are copied from source to target cluster
> 
>
> Key: KAFKA-12468
> URL: https://issues.apache.org/jira/browse/KAFKA-12468
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Bart De Neuter
>Priority: Major
>
> We have an active-passive setup where  the 3 connectors from mirror maker 2 
> (heartbeat, checkpoint and source) are running on a dedicated Kafka connect 
> cluster on the target cluster.
> Offset syncing is enabled as specified by KIP-545. But when activated, it 
> seems the offsets from the source cluster are initially copied to the target 
> cluster without translation. This causes a negative lag for all synced 
> consumer groups. Only when we reset the offsets for each topic/partition on 
> the target cluster and produce a record on the topic/partition in the source, 
> the sync starts working correctly. 
> I would expect that the consumer groups are synced but that the current 
> offsets of the source cluster are not copied to the target cluster.
> This is the configuration we are currently using:
> Heartbeat connector
>  
> {code:xml}
> {
>   "name": "mm2-mirror-heartbeat",
>   "config": {
> "name": "mm2-mirror-heartbeat",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "1",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
>   

[GitHub] [kafka] kamalcph commented on pull request #10259: MINOR: Provide valid examples in README page.

2021-08-04 Thread GitBox


kamalcph commented on pull request #10259:
URL: https://github.com/apache/kafka/pull/10259#issuecomment-892421407


   @chia7712 
   Can you please review this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kamalcph closed pull request #10126: MINOR: Fix wrong Import control declaration

2021-08-04 Thread GitBox


kamalcph closed pull request #10126:
URL: https://github.com/apache/kafka/pull/10126


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >