[jira] [Created] (KAFKA-13968) Broker should not generator snapshot until been unfenced
dengziming created KAFKA-13968: -- Summary: Broker should not generator snapshot until been unfenced Key: KAFKA-13968 URL: https://issues.apache.org/jira/browse/KAFKA-13968 Project: Kafka Issue Type: Bug Components: kraft Reporter: dengziming Assignee: dengziming There is a bug when computing `FeaturesDelta` which cause us to generate snapshot on every commit. [2022-06-08 13:07:43,010] INFO [BrokerMetadataSnapshotter id=0] Creating a new snapshot at offset 0... (kafka.server.metadata.BrokerMetadataSnapshotter:66) [2022-06-08 13:07:43,222] INFO [BrokerMetadataSnapshotter id=0] Creating a new snapshot at offset 2... (kafka.server.metadata.BrokerMetadataSnapshotter:66) [2022-06-08 13:07:43,727] INFO [BrokerMetadataSnapshotter id=0] Creating a new snapshot at offset 3... (kafka.server.metadata.BrokerMetadataSnapshotter:66) [2022-06-08 13:07:44,228] INFO [BrokerMetadataSnapshotter id=0] Creating a new snapshot at offset 4... (kafka.server.metadata.BrokerMetadataSnapshotter:66) Before a broker being unfenced, it won't starting publishing metadata, so it's meaningless to generate a snapshot. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
C0urante commented on code in PR #11779: URL: https://github.com/apache/kafka/pull/11779#discussion_r891910326 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -836,7 +883,9 @@ public void deleteConnectorConfig(final String connName, final Callback configBackingStore.removeConnectorConfig(connName))) { +throw new ConnectException("Failed to remove connector configuration from config topic since worker was fenced out"); +} callback.onCompletion(null, new Created<>(false, null)); Review Comment: On second thought, I think it's probably fine to leave things as they are without adding a manual invocation of `Callback::onCompletion` and a `return null`. Yes, `writeToConfigTopicAsLeader` may throw an exception, but so could writes to the config topic before changes for this KIP were made (such as [here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L835), [here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L894), and [here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L993)). If we were throwing an exception from within the body of the herder request instead of a method that the request invokes, it'd make sense to change that to instead be a manual invocation of the callback with the exception. But just calling a method that might throw an exception is different, and follows existing precedent in the code base without having to jump through special callback-related hoops. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
C0urante commented on code in PR #11779: URL: https://github.com/apache/kafka/pull/11779#discussion_r891910326 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -836,7 +883,9 @@ public void deleteConnectorConfig(final String connName, final Callback configBackingStore.removeConnectorConfig(connName))) { +throw new ConnectException("Failed to remove connector configuration from config topic since worker was fenced out"); +} callback.onCompletion(null, new Created<>(false, null)); Review Comment: On second thought, I think it's probably fine to leave things as they are without adding a manual invocation of `Callback::onCompletion` and a `return null`. Yes, `writeToConfigTopicAsLeader` may throw an exception, but so could writes to the config topic before changes for this KIP were made (such as [here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L835), [here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L894), and [here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L993). If we were throwing an exception from within the body of the herder request instead of a method that the request invokes, it'd make sense to change that to instead be a manual invocation of the callback with the exception. But just calling a method that might throw an exception is different, and follows existing precedent in the code base without having to jump through special callback-related hoops. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
C0urante commented on code in PR #11779: URL: https://github.com/apache/kafka/pull/11779#discussion_r891910326 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -836,7 +883,9 @@ public void deleteConnectorConfig(final String connName, final Callback configBackingStore.removeConnectorConfig(connName))) { +throw new ConnectException("Failed to remove connector configuration from config topic since worker was fenced out"); +} callback.onCompletion(null, new Created<>(false, null)); Review Comment: On second thought, I think it's probably fine to leave things as they are without adding a manual invocation of `Callback::onCompletion` and a `return null`. Yes, `writeToConfigTopicAsLeader` may throw an exception, but so could writes to the config topic before changes for this KIP were made (such as [here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L835), [here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L894), and [here](https://github.com/apache/kafka/blob/8e205b503a39ba8c798e3ce14cd887b66a88551c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L993). If we were throwing an exception from within the body of the herder request instead of a method that the request invokes, it'd make sense to change that to instead be a manual invocation of the callback with the exception. But just calling a method that might throw an exception is fine and follows existing precedent in the code base. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11780: KAFKA-10000: Exactly-once source tasks (KIP-618)
C0urante commented on code in PR #11780: URL: https://github.com/apache/kafka/pull/11780#discussion_r891907113 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.source.SourceTask.TransactionBoundary; +import org.apache.kafka.connect.storage.CloseableOffsetStorageReader; +import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.storage.StatusBackingStore; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.apache.kafka.connect.util.TopicCreationGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + + +/** + * WorkerTask that uses a SourceTask to ingest data into Kafka, with support for exactly-once delivery guarantees. + */ +class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { +private static final Logger log = LoggerFactory.getLogger(ExactlyOnceWorkerSourceTask.class); + +private boolean transactionOpen; +private final LinkedHashMap commitableRecords; + +private final TransactionManager transactionManager; +private final TransactionMetricsGroup transactionMetrics; + +private final ConnectorOffsetBackingStore offsetBackingStore; +private final Runnable preProducerCheck; +private final Runnable postProducerCheck; + +public ExactlyOnceWorkerSourceTask(ConnectorTaskId id, + SourceTask task, + TaskStatus.Listener statusListener, + TargetState initialState, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + TransformationChain transformationChain, + Producer producer, + TopicAdmin admin, + Map topicGroups, + CloseableOffsetStorageReader offsetReader, + OffsetStorageWriter offsetWriter, + ConnectorOffsetBackingStore offsetBackingStore, + WorkerConfig workerConfig, + ClusterConfigState configState, + ConnectMetrics connectMetrics, + ClassLoader loader, + Time time, + RetryWithToleranceOperator retryWithToleranceOperator, + StatusBackingStore
[GitHub] [kafka] C0urante opened a new pull request, #12264: KAFKA-13967: Document guarantees for producer callbacks on transaction commit
C0urante opened a new pull request, #12264: URL: https://github.com/apache/kafka/pull/12264 [Jira](https://issues.apache.org/jira/browse/KAFKA-13967) Also added some `` tags to help organize the rendered Javadocs. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11780: KAFKA-10000: Exactly-once source tasks (KIP-618)
C0urante commented on code in PR #11780: URL: https://github.com/apache/kafka/pull/11780#discussion_r891901602 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.source.SourceTask.TransactionBoundary; +import org.apache.kafka.connect.storage.CloseableOffsetStorageReader; +import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.storage.StatusBackingStore; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.apache.kafka.connect.util.TopicCreationGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + + +/** + * WorkerTask that uses a SourceTask to ingest data into Kafka, with support for exactly-once delivery guarantees. + */ +class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { +private static final Logger log = LoggerFactory.getLogger(ExactlyOnceWorkerSourceTask.class); + +private boolean transactionOpen; +private final LinkedHashMap commitableRecords; + +private final TransactionManager transactionManager; +private final TransactionMetricsGroup transactionMetrics; + +private final ConnectorOffsetBackingStore offsetBackingStore; +private final Runnable preProducerCheck; +private final Runnable postProducerCheck; + +public ExactlyOnceWorkerSourceTask(ConnectorTaskId id, + SourceTask task, + TaskStatus.Listener statusListener, + TargetState initialState, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + TransformationChain transformationChain, + Producer producer, + TopicAdmin admin, + Map topicGroups, + CloseableOffsetStorageReader offsetReader, + OffsetStorageWriter offsetWriter, + ConnectorOffsetBackingStore offsetBackingStore, + WorkerConfig workerConfig, + ClusterConfigState configState, + ConnectMetrics connectMetrics, + ClassLoader loader, + Time time, + RetryWithToleranceOperator retryWithToleranceOperator, + StatusBackingStore
[jira] [Created] (KAFKA-13967) Guarantees for producer callbacks on transaction commit should be documented
Chris Egerton created KAFKA-13967: - Summary: Guarantees for producer callbacks on transaction commit should be documented Key: KAFKA-13967 URL: https://issues.apache.org/jira/browse/KAFKA-13967 Project: Kafka Issue Type: Improvement Components: clients Reporter: Chris Egerton Assignee: Chris Egerton As discussed in https://github.com/apache/kafka/pull/11780#discussion_r891835221, part of the contract for a transactional producer is that all callbacks given to the producer will have been invoked and completed (either successfully or by throwing an exception) by the time that {{KafkaProducer::commitTransaction}} returns. This should be documented so that users of the clients library can have a guarantee that they're not on the hook to do that kind of bookkeeping themselves. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] hachikuji merged pull request #12258: MINOR: Convert `ReassignPartitionsIntegrationTest` to KRaft
hachikuji merged PR #12258: URL: https://github.com/apache/kafka/pull/12258 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13966) Flaky test `QuorumControllerTest.testUnregisterBroker`
Jason Gustafson created KAFKA-13966: --- Summary: Flaky test `QuorumControllerTest.testUnregisterBroker` Key: KAFKA-13966 URL: https://issues.apache.org/jira/browse/KAFKA-13966 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson We have seen the following assertion failure in `QuorumControllerTest.testUnregisterBroker`: ``` org.opentest4j.AssertionFailedError: expected: <2> but was: <0> at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:166) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:161) at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:628) at org.apache.kafka.controller.QuorumControllerTest.testUnregisterBroker(QuorumControllerTest.java:494) ``` I reproduced it by running the test in a loop. It looks like what happens is that the BrokerRegistration request is able to get interleaved between the leader change event and the write of the bootstrap metadata. Something like this: # handleLeaderChange() start # appendWriteEvent(registerBroker) # appendWriteEvent(bootstrapMetadata) # handleLeaderChange() finish # registerBroker() -> writes broker registration to log # bootstrapMetadata() -> writes bootstrap metadata to log -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13966) Flaky test `QuorumControllerTest.testUnregisterBroker`
[ https://issues.apache.org/jira/browse/KAFKA-13966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13966: Description: We have seen the following assertion failure in `QuorumControllerTest.testUnregisterBroker`: {code:java} org.opentest4j.AssertionFailedError: expected: <2> but was: <0> at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:166) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:161) at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:628) at org.apache.kafka.controller.QuorumControllerTest.testUnregisterBroker(QuorumControllerTest.java:494) {code} I reproduced it by running the test in a loop. It looks like what happens is that the BrokerRegistration request is able to get interleaved between the leader change event and the write of the bootstrap metadata. Something like this: # handleLeaderChange() start # appendWriteEvent(registerBroker) # appendWriteEvent(bootstrapMetadata) # handleLeaderChange() finish # registerBroker() -> writes broker registration to log # bootstrapMetadata() -> writes bootstrap metadata to log was: We have seen the following assertion failure in `QuorumControllerTest.testUnregisterBroker`: ``` org.opentest4j.AssertionFailedError: expected: <2> but was: <0> at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:166) at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:161) at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:628) at org.apache.kafka.controller.QuorumControllerTest.testUnregisterBroker(QuorumControllerTest.java:494) ``` I reproduced it by running the test in a loop. It looks like what happens is that the BrokerRegistration request is able to get interleaved between the leader change event and the write of the bootstrap metadata. Something like this: # handleLeaderChange() start # appendWriteEvent(registerBroker) # appendWriteEvent(bootstrapMetadata) # handleLeaderChange() finish # registerBroker() -> writes broker registration to log # bootstrapMetadata() -> writes bootstrap metadata to log > Flaky test `QuorumControllerTest.testUnregisterBroker` > -- > > Key: KAFKA-13966 > URL: https://issues.apache.org/jira/browse/KAFKA-13966 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Major > > We have seen the following assertion failure in > `QuorumControllerTest.testUnregisterBroker`: > {code:java} > org.opentest4j.AssertionFailedError: expected: <2> but was: <0> > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) > at > org.junit.jupiter.api.AssertionUtils.failNotEqual(AssertionUtils.java:62) > at > org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:166) > at > org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:161) > at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:628) > at > org.apache.kafka.controller.QuorumControllerTest.testUnregisterBroker(QuorumControllerTest.java:494) > {code} > I reproduced it by running the test in a loop. It looks like what happens is > that the BrokerRegistration request is able to get interleaved between the > leader change event and the write of the bootstrap metadata. Something like > this: > # handleLeaderChange() start > # appendWriteEvent(registerBroker) > # appendWriteEvent(bootstrapMetadata) > # handleLeaderChange() finish > # registerBroker() -> writes broker registration to log > # bootstrapMetadata() -> writes bootstrap metadata to log -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] showuon commented on pull request #12261: MINOR: add java 8/scala 2.12 deprecation info in doc
showuon commented on PR #12261: URL: https://github.com/apache/kafka/pull/12261#issuecomment-1149358946 @ijuma , thanks for the comments. I've updated the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12261: MINOR: add java 8/scala 2.12 deprecation info in doc
showuon commented on code in PR #12261: URL: https://github.com/apache/kafka/pull/12261#discussion_r891842384 ## docs/upgrade.html: ## @@ -213,6 +213,9 @@ Notable changes in 3 Notable changes in 3.0.0 +Java 8 and Scala 2.12 support have been deprecated since Apache Kafka 3.0 and will be removed in Apache Kafka 4.0. +See https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223;>KIP-750 +and https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218;>KIP-751 for more details. Review Comment: Agree! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #11780: KAFKA-10000: Exactly-once source tasks (KIP-618)
hachikuji commented on code in PR #11780: URL: https://github.com/apache/kafka/pull/11780#discussion_r891835221 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ## @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Min; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; +import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.source.SourceTask.TransactionBoundary; +import org.apache.kafka.connect.storage.CloseableOffsetStorageReader; +import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore; +import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.storage.StatusBackingStore; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.apache.kafka.connect.util.LoggingContext; +import org.apache.kafka.connect.util.TopicAdmin; +import org.apache.kafka.connect.util.TopicCreationGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + + +/** + * WorkerTask that uses a SourceTask to ingest data into Kafka, with support for exactly-once delivery guarantees. + */ +class ExactlyOnceWorkerSourceTask extends AbstractWorkerSourceTask { +private static final Logger log = LoggerFactory.getLogger(ExactlyOnceWorkerSourceTask.class); + +private boolean transactionOpen; +private final LinkedHashMap commitableRecords; + +private final TransactionManager transactionManager; +private final TransactionMetricsGroup transactionMetrics; + +private final ConnectorOffsetBackingStore offsetBackingStore; +private final Runnable preProducerCheck; +private final Runnable postProducerCheck; + +public ExactlyOnceWorkerSourceTask(ConnectorTaskId id, + SourceTask task, + TaskStatus.Listener statusListener, + TargetState initialState, + Converter keyConverter, + Converter valueConverter, + HeaderConverter headerConverter, + TransformationChain transformationChain, + Producer producer, + TopicAdmin admin, + Map topicGroups, + CloseableOffsetStorageReader offsetReader, + OffsetStorageWriter offsetWriter, + ConnectorOffsetBackingStore offsetBackingStore, + WorkerConfig workerConfig, + ClusterConfigState configState, + ConnectMetrics connectMetrics, + ClassLoader loader, + Time time, + RetryWithToleranceOperator retryWithToleranceOperator, + StatusBackingStore
[jira] [Updated] (KAFKA-13965) Document broker-side socket-server-metrics
[ https://issues.apache.org/jira/browse/KAFKA-13965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] James Cheng updated KAFKA-13965: Labels: newbie newbie++ (was: ) > Document broker-side socket-server-metrics > -- > > Key: KAFKA-13965 > URL: https://issues.apache.org/jira/browse/KAFKA-13965 > Project: Kafka > Issue Type: Improvement > Components: documentation >Affects Versions: 3.2.0 >Reporter: James Cheng >Priority: Major > Labels: newbie, newbie++ > > There are a bunch of broker JMX metrics in the "socket-server-metrics" space > that are not documented on kafka.apache.org/documentation > > * {_}MBean{_}: > kafka.server:{{{}type=socket-server-metrics,listener=,networkProcessor={}}} > ** From KIP-188: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-188+-+Add+new+metrics+to+support+health+checks] > * > kafka.server:type=socket-server-metrics,name=connection-accept-rate,listener=\{listenerName} > ** From KIP-612: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers] > It would be helpful to get all the socket-server-metrics documented > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13965) Document broker-side socket-server-metrics
James Cheng created KAFKA-13965: --- Summary: Document broker-side socket-server-metrics Key: KAFKA-13965 URL: https://issues.apache.org/jira/browse/KAFKA-13965 Project: Kafka Issue Type: Improvement Components: documentation Affects Versions: 3.2.0 Reporter: James Cheng There are a bunch of broker JMX metrics in the "socket-server-metrics" space that are not documented on kafka.apache.org/documentation * {_}MBean{_}: kafka.server:{{{}type=socket-server-metrics,listener=,networkProcessor={}}} ** From KIP-188: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-188+-+Add+new+metrics+to+support+health+checks] * kafka.server:type=socket-server-metrics,name=connection-accept-rate,listener=\{listenerName} ** From KIP-612: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers] It would be helpful to get all the socket-server-metrics documented -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] hachikuji merged pull request #12086: Minor : cleanup.policy is a comma separated list
hachikuji merged PR #12086: URL: https://github.com/apache/kafka/pull/12086 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13942) LogOffsetTest occasionally hangs during Jenkins build
[ https://issues.apache.org/jira/browse/KAFKA-13942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-13942. - Resolution: Fixed > LogOffsetTest occasionally hangs during Jenkins build > - > > Key: KAFKA-13942 > URL: https://issues.apache.org/jira/browse/KAFKA-13942 > Project: Kafka > Issue Type: Bug > Components: unit tests >Reporter: David Arthur >Priority: Minor > > [~hachikuji] parsed the log output of one of the recent stalled Jenkins > builds and singled out LogOffsetTest as a likely culprit for not completing. > I looked closely at the following build which appeared to be stuck and found > this test case had STARTED but not PASSED or FAILED. > 15:19:58 LogOffsetTest > > testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(String) > > kafka.server.LogOffsetTest.testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(String)[2] > STARTED -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] hachikuji merged pull request #12262: KAFKA-13942: Fix kraft timeout in LogOffsetTest
hachikuji merged PR #12262: URL: https://github.com/apache/kafka/pull/12262 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d-t-w commented on pull request #12260: MINOR: add note on IDEMPOTENT_WRITE ACL to 3.2.0 notable changes
d-t-w commented on PR #12260: URL: https://github.com/apache/kafka/pull/12260#issuecomment-1149320749 Hi @ijuma I rm'd the `ul` tag. That format was originally requested in the comments here: https://issues.apache.org/jira/browse/KAFKA-13598 More info here: https://kpow.io/articles/kafka-producer-breaking-change/ I'm not sure including this line in the previous `li` is a good idea that mixes this note into the context of a statement about Connect, this breaking change impacts any user of Kafka where: 1. The Kafka Cluster has brokers running version < 2.8.0, and 2. The Kafka Cluster has ACLs configured, but not IDEMPOTENT_WRITE, and 3. Producer configuration is default, or capable of being defaulted to idempotent 4. The producing application is using Kafka-Clients version > 3.2.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13963) Topology Description ignores context.forward
[ https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551333#comment-17551333 ] Matthias J. Sax commented on KAFKA-13963: - {quote}Is it worth updating the java doc to mention this? {quote} Updating docs can never hurt :) – are you interested in doing a PR? {quote}if you use the internal RecordCollector, which I feel should be better hidden from the streams api users {quote} Yes, you should NEVER use internal stuff... Not sure how we could "better hide" it though? Seems not to be possible as long as we are using Java 8... {quote}I can open up a separate bug for that if it makes sense. {quote} Don't think it's a bug? It (unfortunately) how Java works. > Topology Description ignores context.forward > > > Key: KAFKA-13963 > URL: https://issues.apache.org/jira/browse/KAFKA-13963 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.2 >Reporter: Tomasz Kaszuba >Priority: Minor > > I have a simple topology: > {code:java} > val topology = new Topology > topology > .addSource("source", Serdes.stringSerde.deserializer, > Serdes.stringSerde.deserializer, inputTopic) > .addProcessor( > "process", > new ProcessorSupplier[String, String] { > override def get(): Processor[String, String] = > new RecordCollectorProcessor() > }, > "source" > ) {code} > And a simple processor that uses context.forward to forward messages: > {code:java} > private class ContextForwardProcessor extends AbstractProcessor[String, > String]() { override def process(key: String, value: String): Unit = > context().forward("key", "value", To.child("output")) override def > close(): Unit = () > } {code} > when I call topology.describe() I receive this: > {noformat} > Topologies: > Sub-topology: 0 > Source: source (topics: [input]) > --> process > Processor: process (stores: []) > --> none > <-- source {noformat} > Ignoring the fact that this will not run since it will throw a runtime > exception why is the To.child ignored? > Taking it one point further if I add multiple sinks to the topology like so: > {code:java} > val topology = new Topology > topology > .addSource("source", Serdes.stringSerde.deserializer, > Serdes.stringSerde.deserializer, inputTopic) > .addProcessor( > "process", > new ProcessorSupplier[String, String] { > override def get(): Processor[String, String] = > new ContextForwardProcessor() > }, > "source" > ) > .addSink("sink", "output1", Serdes.stringSerde.serializer(), > Serdes.stringSerde.serializer(), "process") > .addSink("sink2", "output2", Serdes.stringSerde.serializer(), > Serdes.stringSerde.serializer(), "process") {code} > but have the processor only output to "output1" it is in no way reflected in > the described topology graph. > I assume this is by design since it's a lot more work to interpret what the > context.forward is doing but when I tried to look for this information in the > java doc I couldn't find it. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-10000) Atomic commit of source connector records and offsets
[ https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-1: -- Priority: Blocker (was: Major) > Atomic commit of source connector records and offsets > - > > Key: KAFKA-1 > URL: https://issues.apache.org/jira/browse/KAFKA-1 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Blocker > Labels: needs-kip > Fix For: 3.3.0 > > > It'd be nice to be able to configure source connectors such that their > offsets are committed if and only if all records up to that point have been > ack'd by the producer. This would go a long way towards EOS for source > connectors. > > This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is > marked as {{WONTFIX}} since it only concerns enabling the idempotent producer > for source connectors and is not concerned with source connector offsets. > This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, > which had a lot of discussion around allowing connector-defined transaction > boundaries. The suggestion in this ticket is to only use source connector > offset commits as the transaction boundaries for connectors; allowing > connector-specified transaction boundaries can be addressed separately. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
C0urante commented on code in PR #11779: URL: https://github.com/apache/kafka/pull/11779#discussion_r891811716 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -309,16 +355,70 @@ public void start() { @Override public void stop() { log.info("Closing KafkaConfigBackingStore"); -try { -configLog.stop(); -} finally { -if (ownTopicAdmin != null) { -ownTopicAdmin.close(); -} + +if (fencableProducer != null) { +Utils.closeQuietly(() -> fencableProducer.close(Duration.ZERO), "fencable producer for config topic"); } +Utils.closeQuietly(ownTopicAdmin, "admin for config topic"); +Utils.closeQuietly(configLog::stop, "KafkaBasedLog for config topic"); + log.info("Closed KafkaConfigBackingStore"); } +@Override +public void claimWritePrivileges() { +if (usesFencableWriter && fencableProducer == null) { +try { +fencableProducer = createFencableProducer(); +fencableProducer.initTransactions(); +} catch (Exception e) { +if (fencableProducer != null) { +Utils.closeQuietly(() -> fencableProducer.close(Duration.ZERO), "fencable producer for config topic"); +fencableProducer = null; +} +throw new ConnectException("Failed to create and initialize fencable producer for config topic", e); +} +} +} + +private Map baseProducerProps(WorkerConfig workerConfig) { +Map producerProps = new HashMap<>(workerConfig.originals()); +String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(workerConfig); +producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); +producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); +producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); +ConnectUtils.addMetricsContextProperties(producerProps, workerConfig, kafkaClusterId); +return producerProps; +} + +// Visible for testing +Map fencableProducerProps(DistributedConfig workerConfig) { +Map result = new HashMap<>(baseProducerProps(workerConfig)); + +// Always require producer acks to all to ensure durable writes +result.put(ProducerConfig.ACKS_CONFIG, "all"); +// Don't allow more than one in-flight request to prevent reordering on retry (if enabled) +result.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); Review Comment: Rebase complete; should be resolved now. ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -291,7 +328,16 @@ public void start() { log.info("Starting KafkaConfigBackingStore"); // Before startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that // updates can continue to occur in the background -configLog.start(); +try { +configLog.start(); +} catch (UnsupportedVersionException e) { +throw new ConnectException( +"Enabling exactly-once support for source connectors requires a Kafka broker version that allows " ++ "admin clients to read consumer offsets. Disable the worker's exactly-once support " ++ "for source connectors, or use a new Kafka broker version.", Review Comment: Rebase complete; should be resolved now. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -784,6 +845,14 @@ private static Map connectorClientConfigOverrides(ConnectorTaskI return clientOverrides; } +private String transactionalId(ConnectorTaskId id) { Review Comment: Rebase complete; should be resolved now. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java: ## @@ -320,6 +320,18 @@ public void putTaskConfigs(final @PathParam("connector") String connector, completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", headers, taskConfigs, forward); } +@PUT +@Path("/{connector}/fence") +public Response fenceZombies(final @PathParam("connector") String connector, + final @Context HttpHeaders headers, + final @QueryParam("forward") Boolean forward, + final byte[] requestBody) throws Throwable { +FutureCallback cb = new FutureCallback<>(); +herder.fenceZombies(connector, cb, InternalRequestSignature.fromHeaders(requestBody, headers)); +completeOrForwardRequest(cb,
[GitHub] [kafka] hachikuji commented on a diff in pull request #12250: KAFKA-13935 Fix static usages of IBP in KRaft mode
hachikuji commented on code in PR #12250: URL: https://github.com/apache/kafka/pull/12250#discussion_r891771090 ## core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala: ## @@ -106,7 +110,8 @@ class RemoteLeaderEndPoint(logPrefix: String, .setPartitionIndex(topicPartition.partition) .setCurrentLeaderEpoch(currentLeaderEpoch) .setTimestamp(earliestOrLatest))) -val requestBuilder = ListOffsetsRequest.Builder.forReplica(brokerConfig.listOffsetRequestVersion, brokerConfig.brokerId) +val meteadataVersion = metadataVersionSupplier() Review Comment: nit: typo `meteadataVersion` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag
hachikuji commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r891769739 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -4321,6 +4328,88 @@ void handleFailure(Throwable throwable) { return new UpdateFeaturesResult(new HashMap<>(updateFutures)); } +@Override +public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) { +NodeProvider provider = new LeastLoadedNodeProvider(); + +final KafkaFutureImpl future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +final Call call = new Call( +"describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { + +private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) { +List voters = new ArrayList<>(); +List observers = new ArrayList<>(); +partition.currentVoters().forEach(v -> { +voters.add(new QuorumInfo.ReplicaState(v.replicaId(), +v.logEndOffset(), +v.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastFetchTimestamp()), +v.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(v.lastCaughtUpTimestamp(; +}); +partition.observers().forEach(o -> { +observers.add(new QuorumInfo.ReplicaState(o.replicaId(), +o.logEndOffset(), +o.lastFetchTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastFetchTimestamp()), +o.lastCaughtUpTimestamp() == -1 ? OptionalLong.empty() : OptionalLong.of(o.lastCaughtUpTimestamp(; +}); +QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, observers); +return info; +} + +@Override +DescribeQuorumRequest.Builder createRequest(int timeoutMs) { +return new Builder(DescribeQuorumRequest.singletonRequest( +new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition(; +} + +@Override +void handleResponse(AbstractResponse response) { +final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response; +if (quorumResponse.data().errorCode() != Errors.NONE.code()) { +throw Errors.forCode(quorumResponse.data().errorCode()).exception(); +} +if (quorumResponse.data().topics().size() > 1) { Review Comment: Maybe we should check if size is not equal to 1 here and below. I guess an empty list is also possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag
hachikuji commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r891768433 ## clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java: ## @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import java.util.List; +import java.util.Objects; +import java.util.OptionalLong; + +/** + * This is used to describe per-partition state in the DescribeQuorumResponse. Review Comment: Yeah, I agree with this. Maybe we can just say that this class contains useful debugging state for KRaft replication. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag
hachikuji commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r891766251 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -4321,6 +4330,73 @@ void handleFailure(Throwable throwable) { return new UpdateFeaturesResult(new HashMap<>(updateFutures)); } +@Override +public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) { +NodeProvider provider = new LeastLoadedNodeProvider(); + +final KafkaFutureImpl future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +final Call call = new Call( +"describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { + +private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) { +Integer partition = 0; +String topicName = response.getTopicNameByIndex(0); +Integer leaderId = response.getPartitionLeaderId(topicName, partition); +List voters = new ArrayList<>(); +List observers = new ArrayList<>(); +response.getVoterInfo(topicName, partition).forEach(v -> { +voters.add(new QuorumInfo.ReplicaState(v.replicaId(), +v.logEndOffset(), +OptionalLong.of(v.lastFetchTimestamp()), +OptionalLong.of(v.lastCaughtUpTimestamp(; +}); +response.getObserverInfo(topicName, partition).forEach(o -> { +observers.add(new QuorumInfo.ReplicaState(o.replicaId(), +o.logEndOffset(), +OptionalLong.of(o.lastFetchTimestamp()), +OptionalLong.of(o.lastCaughtUpTimestamp(; +}); +QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers); +return info; +} + +@Override +DescribeQuorumRequest.Builder createRequest(int timeoutMs) { Review Comment: Nevermind, this is an override. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12181: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 2)
hachikuji commented on code in PR #12181: URL: https://github.com/apache/kafka/pull/12181#discussion_r891635253 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -915,15 +918,21 @@ Set imbalancedPartitions() { return new HashSet<>(imbalancedPartitions); } -ControllerResult alterPartition(AlterPartitionRequestData request) { +ControllerResult alterPartition( +ControllerRequestContext context, +AlterPartitionRequestData request +) { clusterControl.checkBrokerEpoch(request.brokerId(), request.brokerEpoch()); AlterPartitionResponseData response = new AlterPartitionResponseData(); List records = new ArrayList<>(); for (AlterPartitionRequestData.TopicData topicData : request.topics()) { AlterPartitionResponseData.TopicData responseTopicData = -new AlterPartitionResponseData.TopicData().setName(topicData.name()); +new AlterPartitionResponseData.TopicData(). +setTopicName(topicData.topicName()). +setTopicId(topicData.topicId()); response.topics().add(responseTopicData); -Uuid topicId = topicsByName.get(topicData.name()); +Uuid topicId = topicData.topicId().equals(Uuid.ZERO_UUID) ? +topicsByName.get(topicData.topicName()) : topicData.topicId(); Review Comment: This wasn't covered in the KIP, but when we cannot find the provided `TopicId`, should we return `UNKNOWN_TOPIC_ID`? ## clients/src/main/java/org/apache/kafka/common/errors/IneligibleReplica.java: ## @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class IneligibleReplica extends ApiException { Review Comment: nit: I think the usual convention is to add the "Exception" suffix ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -167,7 +168,8 @@ sealed trait PendingPartitionChange extends PartitionState { case class PendingExpandIsr( isr: Set[Int], newInSyncReplicaId: Int, - sentLeaderAndIsr: LeaderAndIsr + sentLeaderAndIsr: LeaderAndIsr, + partitionStateToRollBackTo: PartitionState Review Comment: nit: Wonder if we could use a more concise name. Maybe `priorState` or `lastCommittedState`? ## core/src/main/scala/kafka/controller/KafkaController.scala: ## @@ -2225,194 +2223,210 @@ class KafkaController(val config: KafkaConfig, } } - def alterPartitions(alterPartitionRequest: AlterPartitionRequestData, callback: AlterPartitionResponseData => Unit): Unit = { -val partitionsToAlter = mutable.Map[TopicPartition, LeaderAndIsr]() - -alterPartitionRequest.topics.forEach { topicReq => - topicReq.partitions.forEach { partitionReq => -partitionsToAlter.put( - new TopicPartition(topicReq.name, partitionReq.partitionIndex), - LeaderAndIsr( -alterPartitionRequest.brokerId, -partitionReq.leaderEpoch, -partitionReq.newIsr().asScala.toList.map(_.toInt), -LeaderRecoveryState.of(partitionReq.leaderRecoveryState), -partitionReq.partitionEpoch - ) -) - } -} - -def responseCallback(results: Either[Map[TopicPartition, Either[Errors, LeaderAndIsr]], Errors]): Unit = { - val resp = new AlterPartitionResponseData() - results match { -case Right(error) => - resp.setErrorCode(error.code) -case Left(partitionResults) => - resp.setTopics(new util.ArrayList()) - partitionResults -.groupBy { case (tp, _) => tp.topic } // Group by topic -.foreach { case (topic, partitions) => - // Add each topic part to the response - val topicResp = new AlterPartitionResponseData.TopicData() -.setName(topic) -.setPartitions(new util.ArrayList()) - resp.topics.add(topicResp) - partitions.foreach { case (tp, errorOrIsr) => -// Add each partition part to the response (new ISR or error) -errorOrIsr
[jira] [Comment Edited] (KAFKA-13963) Topology Description ignores context.forward
[ https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551289#comment-17551289 ] Tomasz Kaszuba edited comment on KAFKA-13963 at 6/7/22 9:07 PM: Ok, this is what I thought. Is it worth updating the java doc to mention this? The developers I work with were surprised that context.forward is not covered. We rely heavily on the generated topology graphs for impact analysis. Btw, I think you can get around the context forward exception and the need for registering sinks if you use the internal RecordCollector, which I feel should be better hidden from the streams api users since it's a class cast exception waiting to happen. I can open up a separate bug for that if it makes sense. {code:java} collector = context.asInstanceOf[RecordCollector.Supplier].recordCollector {code} was (Author: tkaszuba): Ok, this is what I thought. Is it worth updating the java doc to mention this? The developers I work with were surprised that context.forward is not covered. We really heavily on the generated topology graphs for impact analysis. Btw, I think you can get around the context forward exception and the need for registering sinks if you use the internal RecordCollector, which I feel should be better hidden from the streams api users since it's a class cast exception waiting to happen. I can open up a separate bug for that if it makes sense. {code:java} collector = context.asInstanceOf[RecordCollector.Supplier].recordCollector {code} > Topology Description ignores context.forward > > > Key: KAFKA-13963 > URL: https://issues.apache.org/jira/browse/KAFKA-13963 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.2 >Reporter: Tomasz Kaszuba >Priority: Minor > > I have a simple topology: > {code:java} > val topology = new Topology > topology > .addSource("source", Serdes.stringSerde.deserializer, > Serdes.stringSerde.deserializer, inputTopic) > .addProcessor( > "process", > new ProcessorSupplier[String, String] { > override def get(): Processor[String, String] = > new RecordCollectorProcessor() > }, > "source" > ) {code} > And a simple processor that uses context.forward to forward messages: > {code:java} > private class ContextForwardProcessor extends AbstractProcessor[String, > String]() { override def process(key: String, value: String): Unit = > context().forward("key", "value", To.child("output")) override def > close(): Unit = () > } {code} > when I call topology.describe() I receive this: > {noformat} > Topologies: > Sub-topology: 0 > Source: source (topics: [input]) > --> process > Processor: process (stores: []) > --> none > <-- source {noformat} > Ignoring the fact that this will not run since it will throw a runtime > exception why is the To.child ignored? > Taking it one point further if I add multiple sinks to the topology like so: > {code:java} > val topology = new Topology > topology > .addSource("source", Serdes.stringSerde.deserializer, > Serdes.stringSerde.deserializer, inputTopic) > .addProcessor( > "process", > new ProcessorSupplier[String, String] { > override def get(): Processor[String, String] = > new ContextForwardProcessor() > }, > "source" > ) > .addSink("sink", "output1", Serdes.stringSerde.serializer(), > Serdes.stringSerde.serializer(), "process") > .addSink("sink2", "output2", Serdes.stringSerde.serializer(), > Serdes.stringSerde.serializer(), "process") {code} > but have the processor only output to "output1" it is in no way reflected in > the described topology graph. > I assume this is by design since it's a lot more work to interpret what the > context.forward is doing but when I tried to look for this information in the > java doc I couldn't find it. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13963) Topology Description ignores context.forward
[ https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551289#comment-17551289 ] Tomasz Kaszuba commented on KAFKA-13963: Ok, this is what I thought. Is it worth updating the java doc to mention this? The developers I work with were surprised that context.forward is not covered. We really heavily on the generated topology graphs for impact analysis. Btw, I think you can get around the context forward exception and the need for registering sinks if you use the internal RecordCollector, which I feel should be better hidden from the streams api users since it's a class cast exception waiting to happen. I can open up a separate bug for that if it makes sense. {code:java} collector = context.asInstanceOf[RecordCollector.Supplier].recordCollector {code} > Topology Description ignores context.forward > > > Key: KAFKA-13963 > URL: https://issues.apache.org/jira/browse/KAFKA-13963 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.2 >Reporter: Tomasz Kaszuba >Priority: Minor > > I have a simple topology: > {code:java} > val topology = new Topology > topology > .addSource("source", Serdes.stringSerde.deserializer, > Serdes.stringSerde.deserializer, inputTopic) > .addProcessor( > "process", > new ProcessorSupplier[String, String] { > override def get(): Processor[String, String] = > new RecordCollectorProcessor() > }, > "source" > ) {code} > And a simple processor that uses context.forward to forward messages: > {code:java} > private class ContextForwardProcessor extends AbstractProcessor[String, > String]() { override def process(key: String, value: String): Unit = > context().forward("key", "value", To.child("output")) override def > close(): Unit = () > } {code} > when I call topology.describe() I receive this: > {noformat} > Topologies: > Sub-topology: 0 > Source: source (topics: [input]) > --> process > Processor: process (stores: []) > --> none > <-- source {noformat} > Ignoring the fact that this will not run since it will throw a runtime > exception why is the To.child ignored? > Taking it one point further if I add multiple sinks to the topology like so: > {code:java} > val topology = new Topology > topology > .addSource("source", Serdes.stringSerde.deserializer, > Serdes.stringSerde.deserializer, inputTopic) > .addProcessor( > "process", > new ProcessorSupplier[String, String] { > override def get(): Processor[String, String] = > new ContextForwardProcessor() > }, > "source" > ) > .addSink("sink", "output1", Serdes.stringSerde.serializer(), > Serdes.stringSerde.serializer(), "process") > .addSink("sink2", "output2", Serdes.stringSerde.serializer(), > Serdes.stringSerde.serializer(), "process") {code} > but have the processor only output to "output1" it is in no way reflected in > the described topology graph. > I assume this is by design since it's a lot more work to interpret what the > context.forward is doing but when I tried to look for this information in the > java doc I couldn't find it. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (KAFKA-13898) metrics.recording.level is underdocumented
[ https://issues.apache.org/jira/browse/KAFKA-13898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Joerger reassigned KAFKA-13898: --- Assignee: Richard Joerger > metrics.recording.level is underdocumented > -- > > Key: KAFKA-13898 > URL: https://issues.apache.org/jira/browse/KAFKA-13898 > Project: Kafka > Issue Type: Improvement > Components: docs >Reporter: Tom Bentley >Assignee: Richard Joerger >Priority: Minor > Labels: newbie > > metrics.recording.level is only briefly described in the documentation. In > particular the recording level associated with each metric is not documented, > which makes it difficult to know the effect of changing the level. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13963) Topology Description ignores context.forward
[ https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551271#comment-17551271 ] Matthias J. Sax commented on KAFKA-13963: - TopologyDescription only describes the structure of you graph of operators. In your first example, you only added two nodes to the graph ("source" and "process") and there is no node "output", and thus it's not contained in the `TopologyDescription`. It's not really possible to take the business logic (ie, what `forward()` is doing) into account – at least I have not idea how this could be done with reasonable effort. It's for sure not a bug. We should either close this ticket and change it into a feature request. > Topology Description ignores context.forward > > > Key: KAFKA-13963 > URL: https://issues.apache.org/jira/browse/KAFKA-13963 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.2 >Reporter: Tomasz Kaszuba >Priority: Minor > > I have a simple topology: > {code:java} > val topology = new Topology > topology > .addSource("source", Serdes.stringSerde.deserializer, > Serdes.stringSerde.deserializer, inputTopic) > .addProcessor( > "process", > new ProcessorSupplier[String, String] { > override def get(): Processor[String, String] = > new RecordCollectorProcessor() > }, > "source" > ) {code} > And a simple processor that uses context.forward to forward messages: > {code:java} > private class ContextForwardProcessor extends AbstractProcessor[String, > String]() { override def process(key: String, value: String): Unit = > context().forward("key", "value", To.child("output")) override def > close(): Unit = () > } {code} > when I call topology.describe() I receive this: > {noformat} > Topologies: > Sub-topology: 0 > Source: source (topics: [input]) > --> process > Processor: process (stores: []) > --> none > <-- source {noformat} > Ignoring the fact that this will not run since it will throw a runtime > exception why is the To.child ignored? > Taking it one point further if I add multiple sinks to the topology like so: > {code:java} > val topology = new Topology > topology > .addSource("source", Serdes.stringSerde.deserializer, > Serdes.stringSerde.deserializer, inputTopic) > .addProcessor( > "process", > new ProcessorSupplier[String, String] { > override def get(): Processor[String, String] = > new ContextForwardProcessor() > }, > "source" > ) > .addSink("sink", "output1", Serdes.stringSerde.serializer(), > Serdes.stringSerde.serializer(), "process") > .addSink("sink2", "output2", Serdes.stringSerde.serializer(), > Serdes.stringSerde.serializer(), "process") {code} > but have the processor only output to "output1" it is in no way reflected in > the described topology graph. > I assume this is by design since it's a lot more work to interpret what the > context.forward is doing but when I tried to look for this information in the > java doc I couldn't find it. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13964) kafka-configs.sh end with UnsupportedVersionException when describing TLS user with quotas
Jakub Stejskal created KAFKA-13964: -- Summary: kafka-configs.sh end with UnsupportedVersionException when describing TLS user with quotas Key: KAFKA-13964 URL: https://issues.apache.org/jira/browse/KAFKA-13964 Project: Kafka Issue Type: Bug Components: admin, kraft Affects Versions: 3.2.0 Environment: Kafka 3.2.0 running on OpenShift 4.10 in KRaft mode managed by Strimzi Reporter: Jakub Stejskal {color:#424242}Usage of {color:#424242}kafka-configs.sh end with {color:#424242}org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support DESCRIBE_USER_SCRAM_CREDENTIALS when describing TLS user with quotas enabled. {color}{color}{color} {code:java} bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --user CN=encrypted-arnost` got status code 1 and stderr: -- Error while executing config command with args '--bootstrap-server localhost:9092 --describe --user CN=encrypted-arnost' java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support DESCRIBE_USER_SCRAM_CREDENTIALS{code} STDOUT contains all necessary data, but the script itself ends with return code 1 and the error above. Scram-sha has not been configured anywhere in that case (not supported by KRaft). This might be fixed by adding support for scram-sha in the next version (not reproducible without KRaft enabled). -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] hachikuji commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag
hachikuji commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r891613523 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -4321,6 +4328,94 @@ void handleFailure(Throwable throwable) { return new UpdateFeaturesResult(new HashMap<>(updateFutures)); } +@Override +public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) { +NodeProvider provider = new LeastLoadedNodeProvider(); + +final KafkaFutureImpl future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +final Call call = new Call( +"describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { + +private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) { +List voters = new ArrayList<>(); +List observers = new ArrayList<>(); +partition.currentVoters().forEach(v -> { +voters.add(new QuorumInfo.ReplicaState(v.replicaId(), +v.logEndOffset(), +OptionalLong.of(v.lastFetchTimestamp()), +OptionalLong.of(v.lastCaughtUpTimestamp(; +}); +partition.observers().forEach(o -> { +observers.add(new QuorumInfo.ReplicaState(o.replicaId(), +o.logEndOffset(), +OptionalLong.of(o.lastFetchTimestamp()), +OptionalLong.of(o.lastCaughtUpTimestamp(; +}); +QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, observers); +return info; +} + +@Override +DescribeQuorumRequest.Builder createRequest(int timeoutMs) { +return new Builder(DescribeQuorumRequest.singletonRequest( +new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition(; +} + +@Override +void handleResponse(AbstractResponse response) { +final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response; +try { +if (quorumResponse.data().errorCode() != Errors.NONE.code()) { +throw Errors.forCode(quorumResponse.data().errorCode()).exception(); +} +if (quorumResponse.data().topics().size() > 1) { +String msg = String.format("DescribeMetadataQuorum received {} topics when 1 was expected", +quorumResponse.data().topics().size()); +log.debug(msg); +throw new UnknownServerException(msg); +} +DescribeQuorumResponseData.TopicData topic = quorumResponse.data().topics().get(0); +if (!topic.topicName().equals(METADATA_TOPIC_NAME)) { +String msg = String.format("DescribeMetadataQuorum received a topic with name {} when {} was expected", +topic.topicName(), METADATA_TOPIC_NAME); +log.debug(msg); +throw new UnknownServerException(msg); +} +if (topic.partitions().size() > 1) { +String msg = String.format("DescribeMetadataQuorum received a topic {} with {} partitions when 1 was expected", +topic.topicName(), topic.partitions().size()); +log.debug(msg); +throw new UnknownServerException(msg); +} +DescribeQuorumResponseData.PartitionData partition = topic.partitions().get(0); +if (partition.partitionIndex() != METADATA_TOPIC_PARTITION.partition()) { +String msg = String.format("DescribeMetadataQuorum received a single partition with index {} when {} was expected", +partition.partitionIndex(), METADATA_TOPIC_PARTITION.partition()); +log.debug(msg); +throw new UnknownServerException(msg); +} +if (partition.errorCode() != Errors.NONE.code()) { +throw Errors.forCode(partition.errorCode()).exception(); +} +future.complete(createQuorumResult(partition)); +} catch (RuntimeException e) { +throw e; +} catch (Exception e) { Review Comment: `UnknownServerException` extends `KafkaException`, which extends `RuntimeException`. So I think all the errors that we are raising above get re-thrown in the
[jira] [Commented] (KAFKA-13939) Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer
[ https://issues.apache.org/jira/browse/KAFKA-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551235#comment-17551235 ] Guozhang Wang commented on KAFKA-13939: --- Once the PR is merged, we can cherry-pick the commit to old branches. But whether the fix would be release depends on whether we would have a bug-fix release (e.g. say 3.2.1) planned in the future. > Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer > - > > Key: KAFKA-13939 > URL: https://issues.apache.org/jira/browse/KAFKA-13939 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Jackson Newhouse >Priority: Blocker > > If `loggingEnabled` is false, the `dirtyKeys` Set is not cleared within > `flush()`, see > [https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java#L262.] > However, dirtyKeys is still written to in the loop within `evictWhile`. This > causes dirtyKeys to continuously grow for the life of the buffer. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] jnewhouse opened a new pull request, #12263: KAFKA-13939: Only track dirty keys if logging is enabled.
jnewhouse opened a new pull request, #12263: URL: https://github.com/apache/kafka/pull/12263 InMemoryTimeOrderedKeyValueBuffer keeps a Set of keys that have been seen in order to log them for durability. This set is never used nor cleared if logging is not enabled. Having it be populated creates a memory leak. This change stops populating the set if logging is not enabled. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request, #12262: MINOR: Fix kraft timeout in LogOffsetTest
hachikuji opened a new pull request, #12262: URL: https://github.com/apache/kafka/pull/12262 We have been seeing a lot of timeouts in `LogOffsetTest` when KRaft is enabled. The problem is the dependence on `MockTime`. In the KRaft broker, we depend on having a steadily advancing time for events in `KafkaEventQueue` to get executed. In the case of the timeouts, the broker was stuck with the next heartbeat event in the queue, which we depended on in order to send the next heartbeat and complete the `initialCatchUpFuture`, which is needed to finish startup. As far as I can tell, the test does not have a strong dependence on `MockTime`, so I have replaced it with system time. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13939) Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer
[ https://issues.apache.org/jira/browse/KAFKA-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551217#comment-17551217 ] Jackson Newhouse commented on KAFKA-13939: -- What's the protocol for back-porting a fix like this? > Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer > - > > Key: KAFKA-13939 > URL: https://issues.apache.org/jira/browse/KAFKA-13939 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Jackson Newhouse >Priority: Blocker > > If `loggingEnabled` is false, the `dirtyKeys` Set is not cleared within > `flush()`, see > [https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java#L262.] > However, dirtyKeys is still written to in the loop within `evictWhile`. This > causes dirtyKeys to continuously grow for the life of the buffer. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13939) Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer
[ https://issues.apache.org/jira/browse/KAFKA-13939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551215#comment-17551215 ] Jackson Newhouse commented on KAFKA-13939: -- I'll open a PR. > Memory Leak When Logging Is Disabled In InMemoryTimeOrderedKeyValueBuffer > - > > Key: KAFKA-13939 > URL: https://issues.apache.org/jira/browse/KAFKA-13939 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Jackson Newhouse >Priority: Blocker > > If `loggingEnabled` is false, the `dirtyKeys` Set is not cleared within > `flush()`, see > [https://github.com/apache/kafka/blob/3.2/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java#L262.] > However, dirtyKeys is still written to in the loop within `evictWhile`. This > causes dirtyKeys to continuously grow for the life of the buffer. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] jsancio merged pull request #12245: KAFKA-13410; Add a --release-version flag for storage-tool
jsancio merged PR #12245: URL: https://github.com/apache/kafka/pull/12245 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12245: KAFKA-13410; Add a --release-version flag for storage-tool
mumrah commented on code in PR #12245: URL: https://github.com/apache/kafka/pull/12245#discussion_r891572046 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -97,9 +97,9 @@ object StorageTool extends Logging { help("The cluster ID to use.") formatParser.addArgument("--ignore-formatted", "-g"). action(storeTrue()) -formatParser.addArgument("--metadata-version", "-v"). Review Comment: That's right, it has not been included in any release. We introduced it in 1135f22eaf which was pretty recent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12245: KAFKA-13410; Add a --release-version flag for storage-tool
jsancio commented on code in PR #12245: URL: https://github.com/apache/kafka/pull/12245#discussion_r891557424 ## core/src/main/scala/kafka/tools/StorageTool.scala: ## @@ -97,9 +97,9 @@ object StorageTool extends Logging { help("The cluster ID to use.") formatParser.addArgument("--ignore-formatted", "-g"). action(storeTrue()) -formatParser.addArgument("--metadata-version", "-v"). Review Comment: We don't need to deprecate this because this version of the storage tool was never released, right? `git tag --contains` doesn't show any reference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag
niket-goel commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r891554323 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -4337,25 +4337,22 @@ public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuoru final Call call = new Call( "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { -private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) { -Integer partition = 0; -String topicName = response.getTopicNameByIndex(0); -Integer leaderId = response.getPartitionLeaderId(topicName, partition); +private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) { List voters = new ArrayList<>(); List observers = new ArrayList<>(); -response.getVoterInfo(topicName, partition).forEach(v -> { +partition.currentVoters().forEach(v -> { voters.add(new QuorumInfo.ReplicaState(v.replicaId(), v.logEndOffset(), OptionalLong.of(v.lastFetchTimestamp()), OptionalLong.of(v.lastCaughtUpTimestamp(; Review Comment: I went back and forth between that and ended up returning a -1 optional here. I now remember that the original intention was to have an empty optional. Will address this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a diff in pull request #12250: KAFKA-13935 Fix static usages of IBP in KRaft mode
junrao commented on code in PR #12250: URL: https://github.com/apache/kafka/pull/12250#discussion_r891551766 ## core/src/main/scala/kafka/server/KafkaConfig.scala: ## @@ -1790,38 +1790,13 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami // We keep the user-provided String as `MetadataVersion.fromVersionString` can choose a slightly different version (eg if `0.10.0` // is passed, `0.10.0-IV0` may be picked) val interBrokerProtocolVersionString = getString(KafkaConfig.InterBrokerProtocolVersionProp) - val interBrokerProtocolVersion = MetadataVersion.fromVersionString(interBrokerProtocolVersionString) - - val fetchRequestVersion: Short = -if (interBrokerProtocolVersion.isAtLeast(IBP_3_1_IV0)) 13 -else if (interBrokerProtocolVersion.isAtLeast(IBP_2_7_IV1)) 12 -else if (interBrokerProtocolVersion.isAtLeast(IBP_2_3_IV1)) 11 -else if (interBrokerProtocolVersion.isAtLeast(IBP_2_1_IV2)) 10 -else if (interBrokerProtocolVersion.isAtLeast(IBP_2_0_IV1)) 8 -else if (interBrokerProtocolVersion.isAtLeast(IBP_1_1_IV0)) 7 -else if (interBrokerProtocolVersion.isAtLeast(IBP_0_11_0_IV1)) 5 -else if (interBrokerProtocolVersion.isAtLeast(IBP_0_11_0_IV0)) 4 -else if (interBrokerProtocolVersion.isAtLeast(IBP_0_10_1_IV1)) 3 -else if (interBrokerProtocolVersion.isAtLeast(IBP_0_10_0_IV0)) 2 -else if (interBrokerProtocolVersion.isAtLeast(IBP_0_9_0)) 1 -else 0 - - val offsetForLeaderEpochRequestVersion: Short = -if (interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 4 -else if (interBrokerProtocolVersion.isAtLeast(IBP_2_3_IV1)) 3 -else if (interBrokerProtocolVersion.isAtLeast(IBP_2_1_IV1)) 2 -else if (interBrokerProtocolVersion.isAtLeast(IBP_2_0_IV0)) 1 -else 0 - - val listOffsetRequestVersion: Short = -if (interBrokerProtocolVersion.isAtLeast(IBP_3_0_IV1)) 7 -else if (interBrokerProtocolVersion.isAtLeast(IBP_2_8_IV0)) 6 -else if (interBrokerProtocolVersion.isAtLeast(IBP_2_2_IV1)) 5 -else if (interBrokerProtocolVersion.isAtLeast(IBP_2_1_IV1)) 4 -else if (interBrokerProtocolVersion.isAtLeast(IBP_2_0_IV1)) 3 -else if (interBrokerProtocolVersion.isAtLeast(IBP_0_11_0_IV0)) 2 -else if (interBrokerProtocolVersion.isAtLeast(IBP_0_10_1_IV2)) 1 -else 0 + val interBrokerProtocolVersion = if (processRoles.isEmpty) { Review Comment: @mumrah : Thanks for the explanation. Covering that in a separate PR sounds good to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #12236: MINOR: Use right enum value for broker registration change
jsancio commented on PR #12236: URL: https://github.com/apache/kafka/pull/12236#issuecomment-1148986153 @dengziming I merged KAFKA-13916. When you have sometime can you take a look at this change again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)
jsancio merged PR #12240: URL: https://github.com/apache/kafka/pull/12240 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag
niket-goel commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r891518982 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -4321,6 +4328,94 @@ void handleFailure(Throwable throwable) { return new UpdateFeaturesResult(new HashMap<>(updateFutures)); } +@Override +public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) { +NodeProvider provider = new LeastLoadedNodeProvider(); + +final KafkaFutureImpl future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +final Call call = new Call( +"describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { + +private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) { +List voters = new ArrayList<>(); +List observers = new ArrayList<>(); +partition.currentVoters().forEach(v -> { +voters.add(new QuorumInfo.ReplicaState(v.replicaId(), +v.logEndOffset(), +OptionalLong.of(v.lastFetchTimestamp()), +OptionalLong.of(v.lastCaughtUpTimestamp(; +}); +partition.observers().forEach(o -> { +observers.add(new QuorumInfo.ReplicaState(o.replicaId(), +o.logEndOffset(), +OptionalLong.of(o.lastFetchTimestamp()), +OptionalLong.of(o.lastCaughtUpTimestamp(; +}); +QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, observers); +return info; +} + +@Override +DescribeQuorumRequest.Builder createRequest(int timeoutMs) { +return new Builder(DescribeQuorumRequest.singletonRequest( +new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition(; +} + +@Override +void handleResponse(AbstractResponse response) { +final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response; +try { +if (quorumResponse.data().errorCode() != Errors.NONE.code()) { +throw Errors.forCode(quorumResponse.data().errorCode()).exception(); +} +if (quorumResponse.data().topics().size() > 1) { +String msg = String.format("DescribeMetadataQuorum received {} topics when 1 was expected", +quorumResponse.data().topics().size()); +log.debug(msg); +throw new UnknownServerException(msg); +} +DescribeQuorumResponseData.TopicData topic = quorumResponse.data().topics().get(0); +if (!topic.topicName().equals(METADATA_TOPIC_NAME)) { +String msg = String.format("DescribeMetadataQuorum received a topic with name {} when {} was expected", +topic.topicName(), METADATA_TOPIC_NAME); +log.debug(msg); +throw new UnknownServerException(msg); +} +if (topic.partitions().size() > 1) { +String msg = String.format("DescribeMetadataQuorum received a topic {} with {} partitions when 1 was expected", +topic.topicName(), topic.partitions().size()); +log.debug(msg); +throw new UnknownServerException(msg); +} +DescribeQuorumResponseData.PartitionData partition = topic.partitions().get(0); +if (partition.partitionIndex() != METADATA_TOPIC_PARTITION.partition()) { +String msg = String.format("DescribeMetadataQuorum received a single partition with index {} when {} was expected", +partition.partitionIndex(), METADATA_TOPIC_PARTITION.partition()); +log.debug(msg); +throw new UnknownServerException(msg); +} +if (partition.errorCode() != Errors.NONE.code()) { +throw Errors.forCode(partition.errorCode()).exception(); +} +future.complete(createQuorumResult(partition)); +} catch (RuntimeException e) { +throw e; +} catch (Exception e) { Review Comment: So this block is me trying to have a single `future.completeExceptionally()` call in this code block. We are catching `UnknownServerException` and any exception
[GitHub] [kafka] niket-goel commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag
niket-goel commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r891518561 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -4321,6 +4328,94 @@ void handleFailure(Throwable throwable) { return new UpdateFeaturesResult(new HashMap<>(updateFutures)); } +@Override +public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options) { +NodeProvider provider = new LeastLoadedNodeProvider(); + +final KafkaFutureImpl future = new KafkaFutureImpl<>(); +final long now = time.milliseconds(); +final Call call = new Call( +"describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { + +private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) { +List voters = new ArrayList<>(); +List observers = new ArrayList<>(); +partition.currentVoters().forEach(v -> { +voters.add(new QuorumInfo.ReplicaState(v.replicaId(), +v.logEndOffset(), +OptionalLong.of(v.lastFetchTimestamp()), +OptionalLong.of(v.lastCaughtUpTimestamp(; +}); +partition.observers().forEach(o -> { +observers.add(new QuorumInfo.ReplicaState(o.replicaId(), +o.logEndOffset(), +OptionalLong.of(o.lastFetchTimestamp()), +OptionalLong.of(o.lastCaughtUpTimestamp(; +}); +QuorumInfo info = new QuorumInfo(partition.leaderId(), voters, observers); +return info; +} + +@Override +DescribeQuorumRequest.Builder createRequest(int timeoutMs) { +return new Builder(DescribeQuorumRequest.singletonRequest( +new TopicPartition(METADATA_TOPIC_NAME, METADATA_TOPIC_PARTITION.partition(; +} + +@Override +void handleResponse(AbstractResponse response) { +final DescribeQuorumResponse quorumResponse = (DescribeQuorumResponse) response; +try { +if (quorumResponse.data().errorCode() != Errors.NONE.code()) { +throw Errors.forCode(quorumResponse.data().errorCode()).exception(); +} +if (quorumResponse.data().topics().size() > 1) { +String msg = String.format("DescribeMetadataQuorum received {} topics when 1 was expected", +quorumResponse.data().topics().size()); +log.debug(msg); +throw new UnknownServerException(msg); +} +DescribeQuorumResponseData.TopicData topic = quorumResponse.data().topics().get(0); +if (!topic.topicName().equals(METADATA_TOPIC_NAME)) { +String msg = String.format("DescribeMetadataQuorum received a topic with name {} when {} was expected", +topic.topicName(), METADATA_TOPIC_NAME); +log.debug(msg); +throw new UnknownServerException(msg); +} +if (topic.partitions().size() > 1) { +String msg = String.format("DescribeMetadataQuorum received a topic {} with {} partitions when 1 was expected", +topic.topicName(), topic.partitions().size()); +log.debug(msg); +throw new UnknownServerException(msg); +} +DescribeQuorumResponseData.PartitionData partition = topic.partitions().get(0); +if (partition.partitionIndex() != METADATA_TOPIC_PARTITION.partition()) { +String msg = String.format("DescribeMetadataQuorum received a single partition with index {} when {} was expected", +partition.partitionIndex(), METADATA_TOPIC_PARTITION.partition()); +log.debug(msg); +throw new UnknownServerException(msg); +} +if (partition.errorCode() != Errors.NONE.code()) { +throw Errors.forCode(partition.errorCode()).exception(); +} +future.complete(createQuorumResult(partition)); +} catch (RuntimeException e) { +throw e; Review Comment: The reason I added this block is because I noticed a gradle warning which suggested that (with the addition of the general Exception catch block), some runtime exceptions might get hidden. A little
[GitHub] [kafka] mimaison commented on pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
mimaison commented on PR #11779: URL: https://github.com/apache/kafka/pull/11779#issuecomment-1148919718 Thanks for the quick updates. I'll try to make another pass tomorrow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
mimaison commented on code in PR #11779: URL: https://github.com/apache/kafka/pull/11779#discussion_r891478789 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java: ## @@ -320,6 +320,18 @@ public void putTaskConfigs(final @PathParam("connector") String connector, completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", headers, taskConfigs, forward); } +@PUT +@Path("/{connector}/fence") +public Response fenceZombies(final @PathParam("connector") String connector, + final @Context HttpHeaders headers, + final @QueryParam("forward") Boolean forward, + final byte[] requestBody) throws Throwable { +FutureCallback cb = new FutureCallback<>(); +herder.fenceZombies(connector, cb, InternalRequestSignature.fromHeaders(requestBody, headers)); +completeOrForwardRequest(cb, "/connectors/" + connector + "/fence", "PUT", headers, requestBody, forward); +return Response.ok().build(); Review Comment: As far as I can tell the other internal endpoint returns 204 so I'd be in favor of doing the same here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #11473: KAFKA-13436: Omitted BrokerTopicMetrics metrics in the documentation
mimaison commented on code in PR #11473: URL: https://github.com/apache/kafka/pull/11473#discussion_r891403316 ## docs/ops.html: ## @@ -1469,33 +1489,38 @@
[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
C0urante commented on code in PR #11779: URL: https://github.com/apache/kafka/pull/11779#discussion_r891395016 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -784,6 +845,14 @@ private static Map connectorClientConfigOverrides(ConnectorTaskI return clientOverrides; } +private String transactionalId(ConnectorTaskId id) { Review Comment: Ah whoops, those changes were made on https://github.com/apache/kafka/pull/11780, which hasn't been merged yet, so a rebase isn't going to automatically draw them in. I'll do the change manually here but there may be other small changes in not-yet-merged PRs that don't get pulled in here. It should be fine as those changes are included in whichever PR gets merged last. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
C0urante commented on code in PR #11779: URL: https://github.com/apache/kafka/pull/11779#discussion_r891380187 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -291,7 +328,16 @@ public void start() { log.info("Starting KafkaConfigBackingStore"); // Before startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that // updates can continue to occur in the background -configLog.start(); +try { +configLog.start(); +} catch (UnsupportedVersionException e) { +throw new ConnectException( +"Enabling exactly-once support for source connectors requires a Kafka broker version that allows " ++ "admin clients to read consumer offsets. Disable the worker's exactly-once support " ++ "for source connectors, or use a new Kafka broker version.", Review Comment: Haha yep, caught and fixed this in an upstream PR that's since been merged. Will pick up in the rebase. https://github.com/apache/kafka/blob/a6c5a74fdbdce9a992b47706913c920902cda28c/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L323 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -309,16 +355,70 @@ public void start() { @Override public void stop() { log.info("Closing KafkaConfigBackingStore"); -try { -configLog.stop(); -} finally { -if (ownTopicAdmin != null) { -ownTopicAdmin.close(); -} + +if (fencableProducer != null) { +Utils.closeQuietly(() -> fencableProducer.close(Duration.ZERO), "fencable producer for config topic"); } +Utils.closeQuietly(ownTopicAdmin, "admin for config topic"); +Utils.closeQuietly(configLog::stop, "KafkaBasedLog for config topic"); + log.info("Closed KafkaConfigBackingStore"); } +@Override +public void claimWritePrivileges() { +if (usesFencableWriter && fencableProducer == null) { +try { +fencableProducer = createFencableProducer(); +fencableProducer.initTransactions(); +} catch (Exception e) { +if (fencableProducer != null) { +Utils.closeQuietly(() -> fencableProducer.close(Duration.ZERO), "fencable producer for config topic"); +fencableProducer = null; +} +throw new ConnectException("Failed to create and initialize fencable producer for config topic", e); +} +} +} + +private Map baseProducerProps(WorkerConfig workerConfig) { +Map producerProps = new HashMap<>(workerConfig.originals()); +String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(workerConfig); +producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); +producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); +producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.MAX_VALUE); +ConnectUtils.addMetricsContextProperties(producerProps, workerConfig, kafkaClusterId); +return producerProps; +} + +// Visible for testing +Map fencableProducerProps(DistributedConfig workerConfig) { +Map result = new HashMap<>(baseProducerProps(workerConfig)); + +// Always require producer acks to all to ensure durable writes +result.put(ProducerConfig.ACKS_CONFIG, "all"); +// Don't allow more than one in-flight request to prevent reordering on retry (if enabled) +result.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); Review Comment: Yep, this got fixed in https://github.com/apache/kafka/pull/11778, which just got merged. A rebase should take care of this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
C0urante commented on code in PR #11779: URL: https://github.com/apache/kafka/pull/11779#discussion_r891379394 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java: ## @@ -280,4 +326,4 @@ public int hashCode() { inconsistentConnectors, configTransformer); } -} +} Review Comment: 臘 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java: ## @@ -280,4 +326,4 @@ public int hashCode() { inconsistentConnectors, configTransformer); } -} +} Review Comment: 臘 sorry, done. ## connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java: ## @@ -139,14 +140,23 @@ public void testNoFileOption() throws IOException { jaasBasicAuthFilter.filter(requestContext); verify(requestContext).abortWith(any(Response.class)); -verify(requestContext).getMethod(); +verify(requestContext, atLeastOnce()).getMethod(); verify(requestContext).getHeaderString(JaasBasicAuthFilter.AUTHORIZATION); } @Test -public void testPostWithoutAppropriateCredential() throws IOException { +public void testInternalTaskConfigEndpointSkipped() throws IOException { +testInternalEndpointSkipped("connectors/connName/tasks"); +} + +@Test +public void testInternalZombieFencingEndpointSkipped() throws IOException { +testInternalEndpointSkipped("connectors/connName/fence"); +} + +private void testInternalEndpointSkipped(String endpoint) throws IOException { UriInfo uriInfo = mock(UriInfo.class); -when(uriInfo.getPath()).thenReturn("connectors/connName/tasks"); +when(uriInfo.getPath()).thenReturn(endpoint); ContainerRequestContext requestContext = mock(ContainerRequestContext.class); when(requestContext.getMethod()).thenReturn(HttpMethod.POST); Review Comment: this test was broken and did not catch calls to `ContainerRequestContext::abort`. I've updated the test to catch those calls and, after it started failing, also updated it to use the correct HTTP method. Good catch, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
C0urante commented on code in PR #11779: URL: https://github.com/apache/kafka/pull/11779#discussion_r891379608 ## connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java: ## @@ -139,14 +140,23 @@ public void testNoFileOption() throws IOException { jaasBasicAuthFilter.filter(requestContext); verify(requestContext).abortWith(any(Response.class)); -verify(requestContext).getMethod(); +verify(requestContext, atLeastOnce()).getMethod(); verify(requestContext).getHeaderString(JaasBasicAuthFilter.AUTHORIZATION); } @Test -public void testPostWithoutAppropriateCredential() throws IOException { +public void testInternalTaskConfigEndpointSkipped() throws IOException { +testInternalEndpointSkipped("connectors/connName/tasks"); +} + +@Test +public void testInternalZombieFencingEndpointSkipped() throws IOException { +testInternalEndpointSkipped("connectors/connName/fence"); +} + +private void testInternalEndpointSkipped(String endpoint) throws IOException { UriInfo uriInfo = mock(UriInfo.class); -when(uriInfo.getPath()).thenReturn("connectors/connName/tasks"); +when(uriInfo.getPath()).thenReturn(endpoint); ContainerRequestContext requestContext = mock(ContainerRequestContext.class); when(requestContext.getMethod()).thenReturn(HttpMethod.POST); Review Comment: this test was broken and did not catch calls to `ContainerRequestContext::abortWith`. I've updated the test to catch those calls and, after it started failing, also updated it to use the correct HTTP method. Good catch, thanks! ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -784,6 +845,14 @@ private static Map connectorClientConfigOverrides(ConnectorTaskI return clientOverrides; } +private String transactionalId(ConnectorTaskId id) { Review Comment: Yep, exactly Going to try to do the rebase today, but may not be able to finish by EOD as it's going to be fairly involved. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -784,6 +845,14 @@ private static Map connectorClientConfigOverrides(ConnectorTaskI return clientOverrides; } +private String transactionalId(ConnectorTaskId id) { +return transactionalId(config.groupId(), id.connector(), id.task()); +} + +public static String transactionalId(String groupId, String connector, int taskId) { Review Comment: It's used in integration tests later on: https://github.com/C0urante/kafka/blob/3d65e799925096d519b4adf906be05cba70addeb/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java#L828 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -201,6 +214,9 @@ public static String COMMIT_TASKS_KEY(String connectorName) { public static final Schema TARGET_STATE_V0 = SchemaBuilder.struct() .field("state", Schema.STRING_SCHEMA) .build(); +public static final Schema TASK_COUNT_RECORD_V0 = SchemaBuilder.struct() +.field("tasks", Schema.INT32_SCHEMA) Review Comment: I think given the key format ("tasks-count-connector") this is probably fine, and the name of the field is also specified in the KIP. But similar to the 200 vs. 204 HTTP response for the fencing endpoint, this is internal and a small detail, so I can change it if we agree that this kind of detail doesn't need to precisely match what's in the KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
C0urante commented on code in PR #11779: URL: https://github.com/apache/kafka/pull/11779#discussion_r891379125 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java: ## @@ -138,6 +138,18 @@ public interface Herder { */ void putTaskConfigs(String connName, List> configs, Callback callback, InternalRequestSignature requestSignature); +/** + * Fence out any older task generations for a source connector, and then write a record to the config topic + * indicating that it is safe to bring up a new generation of tasks. If that record is already present, do nothing + * and invoke the callback successfully. + * @param connName the name of the connector to fence out, which must refer to a source connector; if the + * connector does not exist or is not a source connector, the callback will be invoked with an error + * @param callback callback to invoke upon completion + * @param requestSignature the signature of the request made for this connector; + * may be null if no signature was provided + */ +void fenceZombies(String connName, Callback callback, InternalRequestSignature requestSignature); Review Comment: Fine by me ## gradle/spotbugs-exclude.xml: ## @@ -311,6 +311,16 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read + +
[GitHub] [kafka] mumrah commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)
mumrah commented on code in PR #12240: URL: https://github.com/apache/kafka/pull/12240#discussion_r891371390 ## metadata/src/main/resources/common/metadata/BrokerRegistrationChangeRecord.json: ## @@ -17,14 +17,16 @@ "apiKey": 17, "type": "metadata", "name": "BrokerRegistrationChangeRecord", - "validVersions": "0", + "validVersions": "0-1", Review Comment: I actually think we should not increase the record version unless there is an incompatible change. To my knowledge, that is what the record version has conveyed historically. However, I don't think there's any _harm_ in increasing it, and I also don't think we need to solve it in this PR. Let's go with Colin's suggestion here and we can continue this discussion on the mailing list or offline -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #11778: KAFKA-10000: Use transactional producer for config topic (KIP-618)
C0urante commented on PR #11778: URL: https://github.com/apache/kafka/pull/11778#issuecomment-1148809725 Thanks Luke, and thanks Tom! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)
mumrah commented on code in PR #12240: URL: https://github.com/apache/kafka/pull/12240#discussion_r891357405 ## metadata/src/main/java/org/apache/kafka/image/MetadataImage.java: ## @@ -120,10 +121,16 @@ public AclsImage acls() { } public void write(Consumer> out) { +// We use the minimum KRaft metadata version if this image does +// not have a specific version set. +MetadataVersion metadataVersion = features.metadataVersion(); +if (metadataVersion.equals(MetadataVersion.UNINITIALIZED)) { +metadataVersion = MetadataVersion.IBP_3_0_IV1; +} Review Comment: Right, once that PR is merged, KRaft will be at metadata version IBP_3_0_IV1 implicitly until the controller finishes bootstrapping. This will be true on the controller and broker side of things -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13908) RuntimeException will be thrown in BrokerServer.startup, not the cause of exception
[ https://issues.apache.org/jira/browse/KAFKA-13908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551141#comment-17551141 ] Richard Joerger commented on KAFKA-13908: - Luke, thtatnks so much for the quick insight. I'll head down that path. > RuntimeException will be thrown in BrokerServer.startup, not the cause of > exception > --- > > Key: KAFKA-13908 > URL: https://issues.apache.org/jira/browse/KAFKA-13908 > Project: Kafka > Issue Type: Improvement >Reporter: Luke Chen >Assignee: Richard Joerger >Priority: Major > Labels: newbie > > Before [#11969|https://github.com/apache/kafka/pull/11969], We will throw an > {{ExecutionException(KafkaStorageException)}} in > {{{}BrokerServer.startup{}}}, and we'll catch the exception and rethrow the > cause by: > {code:java} > throw if (e.isInstanceOf[ExecutionException]) e.getCause else e{code} > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/Kafka.scala#L113] > > After [#11969|https://github.com/apache/kafka/pull/11969], We will throw a > {{{}RuntimeException(ExecutionException(KafkaStorageException)){}}}. But the > catch logic didn't change. That means, if the exception is RuntimeException, > we won't throw only the cause, but all the exception chains. > > We should update it and add tests for it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] mimaison commented on a diff in pull request #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…
mimaison commented on code in PR #12248: URL: https://github.com/apache/kafka/pull/12248#discussion_r891335380 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -812,13 +817,16 @@ class ReplicaManager(val config: KafkaConfig, new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath) .setErrorCode(Errors.NONE.code).setTopics(topicInfos) + .setTotalBytes(totalBytes).setUsableBytes(usableBytes) case None => new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath) .setErrorCode(Errors.NONE.code) + .setTotalBytes(totalBytes).setUsableBytes(usableBytes) } } catch { case e: KafkaStorageException => + e.printStackTrace() Review Comment: Oops, no we obviously don't want that! Thanks, fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #12259: HOTFIX: only try to clear discover-coordinator future upon commit
ijuma merged PR #12259: URL: https://github.com/apache/kafka/pull/12259 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
mimaison commented on code in PR #11779: URL: https://github.com/apache/kafka/pull/11779#discussion_r890986946 ## gradle/spotbugs-exclude.xml: ## @@ -311,6 +311,16 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read + +
[GitHub] [kafka] ijuma commented on a diff in pull request #12261: MINOR: add java 8/scala 2.12 deprecation info in doc
ijuma commented on code in PR #12261: URL: https://github.com/apache/kafka/pull/12261#discussion_r891281407 ## docs/upgrade.html: ## @@ -213,6 +213,9 @@ Notable changes in 3 Notable changes in 3.0.0 +Java 8 and Scala 2.12 support have been deprecated since Apache Kafka 3.0 and will be removed in Apache Kafka 4.0. +See https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308223;>KIP-750 +and https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181308218;>KIP-751 for more details. Review Comment: Maybe move this to the second line since it's less impactful than the producer idempotence change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12261: MINOR: add java 8/scala 2.12 deprecation info in doc
ijuma commented on code in PR #12261: URL: https://github.com/apache/kafka/pull/12261#discussion_r891279768 ## docs/ops.html: ## @@ -1266,7 +1266,8 @@ 6.6 Java Version - Java 8 and Java 11 are supported. Java 11 performs significantly better if TLS is enabled, so it is highly recommended (it also includes a number of other + Java 8, Java 11, and Java 17 are supported. Note that Java 8 support has been deprecated since Apache Kafka 3.0 and will be removed in Apache Kafka 4.0. + Java 11 and later versions performs significantly better if TLS is enabled, so it is highly recommended (it also includes a number of other Review Comment: "Java 11 and later versions perform significantly better if TLS is enabled, so they are highly recommended (they also includes a number of other" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics
cadonna merged PR #12235: URL: https://github.com/apache/kafka/pull/12235 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics
cadonna commented on PR #12235: URL: https://github.com/apache/kafka/pull/12235#issuecomment-1148710618 The test failures are unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics
cadonna commented on code in PR #12235: URL: https://github.com/apache/kafka/pull/12235#discussion_r891064878 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java: ## @@ -166,4 +171,46 @@ public static String extractThreadId(final String fullThreadName) { final int index = fullThreadName.indexOf("StreamThread-"); return fullThreadName.substring(index); } + +public static long producerRecordSizeInBytes(final ProducerRecord record) { +return recordSizeInBytes( +record.key().length, +record.value() == null ? 0 : record.value().length, +record.topic(), +record.headers() +); +} + +public static long consumerRecordSizeInBytes(final ConsumerRecord record) { +return recordSizeInBytes( +record.serializedKeySize(), +record.serializedValueSize(), +record.topic(), +record.headers() +); +} + +public static long recordSizeInBytes(final long keyBytes, Review Comment: nit: Could you make this private? ## streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java: ## @@ -46,13 +47,15 @@ public void send(final String topic, final Integer partition, final Long timestamp, final Serializer keySerializer, -final Serializer valueSerializer) { +final Serializer valueSerializer, +final String processorNodeId, +final InternalProcessorContext context) { collected.add(new ProducerRecord<>(topic, -partition, -timestamp, -key, -value, -headers)); + partition, + timestamp, + key, + value, + headers)); Review Comment: nit: The indentation was actually right since it is a method call and not a method declaration. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java: ## @@ -192,6 +217,28 @@ public void send(final String topic, } else { log.warn("Received offset={} in produce response for {}", metadata.offset(), tp); } + +if (!topic.endsWith("-changelog")) { +// we may not have created a sensor yet if the node uses dynamic topic routing Review Comment: This comment is a bit misleading here. AFAIU it refers to the `else`-branch. Please move it or remove it. I think you know my preference . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13335) Upgrading connect from 2.7.0 to 2.8.0 causes worker instability
[ https://issues.apache.org/jira/browse/KAFKA-13335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Gray resolved KAFKA-13335. --- Resolution: Not A Problem Finally got back to this after a long time. This is no bug or fault of Kafka Connect. We have a lot of connectors, so it takes a while to rebalance all of them. We were simply constantly hitting the rebalance.timeout.ms, leaving us in an endless loop of rebalancing. Not sure what changed between 2.7.0 and 2.8.0 to enforce this timeout or to lengthen the time to rebalance, but something did. Bumped the timeout to 3 minutes from 1 minute and we are good to go! > Upgrading connect from 2.7.0 to 2.8.0 causes worker instability > --- > > Key: KAFKA-13335 > URL: https://issues.apache.org/jira/browse/KAFKA-13335 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.8.0 >Reporter: John Gray >Priority: Major > Attachments: image-2021-09-29-09-15-18-172.png > > > After recently upgrading our connect cluster to 2.8.0 (via > strimzi+Kubernetes, brokers are still on 2.7.0), I am noticing that the > cluster is struggling to stabilize. Connectors are being > unassigned/reassigned/duplicated continuously, and never settling back down. > A downgrade back to 2.7.0 fixes things immediately. I have attached a picture > of our Grafana dashboards showing some metrics. We have a connect cluster > with 4 nodes, trying to maintain about 1000 connectors, each connector with a > maxTask of 1. > We are noticing a slow increase in memory usage with big random peaks of > tasks counts and thread counts. > I do also notice over the course of letting 2.8.0 run a huge increase in logs > stating that {code}ERROR Graceful stop of task (task name here) > failed.{code}, but the logs do not seem to indicate a reason. The connector > appears to be stopped only seconds after its creation. It appears to only > affect our source connectors. These logs stop after downgrading back to 2.7.0. > I am also seeing an increase in logs stating that {code}Couldn't instantiate > task (task name) because it has an invalid task configuration. This task will > not execute until reconfigured. > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [StartAndStopExecutor-connect-1-1] > org.apache.kafka.connect.errors.ConnectException: Task already exists in this > worker: (task name) > at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:512) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1251) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1700(DistributedHerder.java:127) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1266) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1262) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834){code} > I am not sure what could be causing this, any insight would be appreciated! > I do notice Kafka 2.7.1/2.8.0 contains a bugfix related to connect rebalances > (KAFKA-10413). Is that fix potentially causing instability? -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] tombentley commented on pull request #12196: Implement SCRAM support in KRaft
tombentley commented on PR #12196: URL: https://github.com/apache/kafka/pull/12196#issuecomment-1148690209 @cmccabe is there a JIRA for this? Are you intending to implement support for `DescribeUserScramCredentials` in this PR too, or will that be done in a later PR? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12260: MINOR: add note on IDEMPOTENT_WRITE ACL to 3.2.0 notable changes
ijuma commented on code in PR #12260: URL: https://github.com/apache/kafka/pull/12260#discussion_r891237342 ## docs/upgrade.html: ## @@ -67,6 +67,9 @@ Notable changes in 3 which meant that idempotence remained disabled unless the user had explicitly set enable.idempotence to true (See https://issues.apache.org/jira/browse/KAFKA-13598;>KAFKA-13598for more details). This issue was fixed and the default is properly applied in 3.0.1, 3.1.1, and 3.2.0. + +When the broker version is lower than 2.8.0, and the client version is 3.0.1, 3.1.1, and later, the IDEMPOTENT_WRITE permission is required to produce data Review Comment: I would say something like: Please read the relevant KIP section for the compatibility implications - a noteworthy item worth highlighting is... Also, we should use the same approach for this and other lines. It's unclear to me why we added a `ul` here, but not in the other case. I'd probably include this paragraph in the previous ## docs/upgrade.html: ## @@ -67,6 +67,9 @@ Notable changes in 3 which meant that idempotence remained disabled unless the user had explicitly set enable.idempotence to true (See https://issues.apache.org/jira/browse/KAFKA-13598;>KAFKA-13598for more details). This issue was fixed and the default is properly applied in 3.0.1, 3.1.1, and 3.2.0. + +When the broker version is lower than 2.8.0, and the client version is 3.0.1, 3.1.1, and later, the IDEMPOTENT_WRITE permission is required to produce data Review Comment: I would say something like: Please read the relevant KIP section for the compatibility implications - a noteworthy item worth highlighting is... Also, we should use the same approach for this and other lines. It's unclear to me why we added a `ul` here, but not in the other case. I'd probably include this paragraph in the previous `li` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-8420) Graceful handling when consumer switches from subscribe to manual assign
[ https://issues.apache.org/jira/browse/KAFKA-8420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551082#comment-17551082 ] Matthew de Detrich edited comment on KAFKA-8420 at 6/7/22 1:20 PM: --- So in order to work on this issue I tried making a test to replicate what you are describing and I came across some interesting, the test that I wrote looks like this {code:java} @Test public void gracefulHandlingSwitchSubscribeToManualAssign() { ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null); ConsumerRecords initialConsumerRecords = consumer.poll(Duration.ofMillis(0)); assertTrue(initialConsumerRecords.isEmpty()); consumer.unsubscribe(); consumer.assign(singleton(tp0)); client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator); consumer.poll(Duration.ofSeconds(1)); } {code} The problem that I am currently getting is that the {{consumer.poll(Duration.ofSeconds(1));}} is causing an infinite loop/deadlock (note that originally I had a {{consumer.poll(Duration.ofSeconds(0));}} however this caused the {{consumer.poll}} method to short circuit due to {{timer.notExpired()}} never executing and hence just immediately returning an {{ConsumerRecords.empty();}} without the consumer ever sending a request to trigger a sync-group resonse). After spending some time debugging this is the piece of code that is not terminating [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L250-L251]. What I am finding highly confusing if the fact that the {{lookupCoordinator()}} does actually complete (in this case it immediately returns {{findCoordinatorFuture}} at [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L294]) however for some reason the loop at [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L215] never terminates. It doesn't appear to detect that the future has finished which I believe to be the case? I am not sure if this is related to what you mentioned, i.e. {quote}In the worst case (i.e. leader keep sending incompatible assignment), this would case the consumer to fall into endless re-joins. {quote} but it looks like that I have either found something else or I am barking up the wrong tree? Do you have any insights into this [~guozhang] was (Author: mdedetrich-aiven): So in order to work on this issue I tried making a test to replicate what you are describing and I came across some interesting, the test that I wrote looks like this {code:java} @Test public void gracefulHandlingSwitchSubscribeToManualAssign() { ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null); ConsumerRecords initialConsumerRecords = consumer.poll(Duration.ofMillis(0)); assertTrue(initialConsumerRecords.isEmpty()); consumer.unsubscribe(); consumer.assign(singleton(tp0)); client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator); consumer.poll(Duration.ofSeconds(1)); } {code} The problem that I am currently getting is that the {{consumer.poll(Duration.ofSeconds(1));}} is causing an infinite loop/deadlock (note that originally I had a {{consumer.poll(Duration.ofSeconds(0));}} however this caused the {{consumer.poll}} method to short circuit due to {{timer.notExpired()}} never executing and hence just immediately returning an {{ConsumerRecords.empty();}} without the consumer ever sending a request to trigger a sync-group resonse). After spending some time debugging this is the piece of code that is not terminating
[jira] [Commented] (KAFKA-8420) Graceful handling when consumer switches from subscribe to manual assign
[ https://issues.apache.org/jira/browse/KAFKA-8420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17551082#comment-17551082 ] Matthew de Detrich commented on KAFKA-8420: --- So in order to work on this issue I tried making a test to replicate what you are describing and I came across some interesting, the test that I wrote looks like this {code:java} @Test public void gracefulHandlingSwitchSubscribeToManualAssign() { ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null); ConsumerRecords initialConsumerRecords = consumer.poll(Duration.ofMillis(0)); assertTrue(initialConsumerRecords.isEmpty()); consumer.unsubscribe(); consumer.assign(singleton(tp0)); client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator); consumer.poll(Duration.ofSeconds(1)); } {code} The problem that I am currently getting is that the {{consumer.poll(Duration.ofSeconds(1));}} is causing an infinite loop/deadlock (note that originally I had a {{consumer.poll(Duration.ofSeconds(0));}} however this caused the {{consumer.poll}} method to short circuit due to {{timer.notExpired()}} never executing and hence just immediately returning an {{ConsumerRecords.empty();}} without the consumer ever sending a request to trigger a sync-group resonse). After spending some time debugging this is the piece of code that is not terminating [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L250-L251]. What I am finding highly confusing if the fact that the {{lookupCoordinator()}} does actually complete (in this case it immediately returns {{findCoordinatorFuture}} at [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L294]) however for some reason the loop at [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L215] never terminates. It doesn't appear to detect that the future has finished which I believe to be the case? I am not sure if this is related to what you mentioned, i.e. {quote} In the worst case (i.e. leader keep sending incompatible assignment), this would case the consumer to fall into endless re-joins. {quote} but it looks like that I have either found something else or I am barking up the wrong tree? > Graceful handling when consumer switches from subscribe to manual assign > > > Key: KAFKA-8420 > URL: https://issues.apache.org/jira/browse/KAFKA-8420 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: Guozhang Wang >Assignee: Matthew de Detrich >Priority: Major > > Today if a consumer switches between subscribe (and hence relies on group > rebalance to get assignment) and manual assign, it may cause unnecessary > rebalances. For example: > 1. consumer.subscribe(); > 2. consumer.poll(); // join-group request sent, returns empty because > poll timeout > 3. consumer.unsubscribe(); > 4. consumer.assign(..); > 5. consumer.poll(); // sync-group request received, and the assigned > partitions does not match the current subscription-state. In this case it > will tries to re-join which is not necessary. > In the worst case (i.e. leader keep sending incompatible assignment), this > would case the consumer to fall into endless re-joins. > Although it is not a very common usage scenario, it still worth being better > handled than the status-quo. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13963) Topology Description ignores context.forward
[ https://issues.apache.org/jira/browse/KAFKA-13963?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tomasz Kaszuba updated KAFKA-13963: --- Component/s: streams > Topology Description ignores context.forward > > > Key: KAFKA-13963 > URL: https://issues.apache.org/jira/browse/KAFKA-13963 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.2 >Reporter: Tomasz Kaszuba >Priority: Minor > > I have a simple topology: > {code:java} > val topology = new Topology > topology > .addSource("source", Serdes.stringSerde.deserializer, > Serdes.stringSerde.deserializer, inputTopic) > .addProcessor( > "process", > new ProcessorSupplier[String, String] { > override def get(): Processor[String, String] = > new RecordCollectorProcessor() > }, > "source" > ) {code} > And a simple processor that uses context.forward to forward messages: > {code:java} > private class ContextForwardProcessor extends AbstractProcessor[String, > String]() { override def process(key: String, value: String): Unit = > context().forward("key", "value", To.child("output")) override def > close(): Unit = () > } {code} > when I call topology.describe() I receive this: > {noformat} > Topologies: > Sub-topology: 0 > Source: source (topics: [input]) > --> process > Processor: process (stores: []) > --> none > <-- source {noformat} > Ignoring the fact that this will not run since it will throw a runtime > exception why is the To.child ignored? > Taking it one point further if I add multiple sinks to the topology like so: > {code:java} > val topology = new Topology > topology > .addSource("source", Serdes.stringSerde.deserializer, > Serdes.stringSerde.deserializer, inputTopic) > .addProcessor( > "process", > new ProcessorSupplier[String, String] { > override def get(): Processor[String, String] = > new ContextForwardProcessor() > }, > "source" > ) > .addSink("sink", "output1", Serdes.stringSerde.serializer(), > Serdes.stringSerde.serializer(), "process") > .addSink("sink2", "output2", Serdes.stringSerde.serializer(), > Serdes.stringSerde.serializer(), "process") {code} > but have the processor only output to "output1" it is in no way reflected in > the described topology graph. > I assume this is by design since it's a lot more work to interpret what the > context.forward is doing but when I tried to look for this information in the > java doc I couldn't find it. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13963) Topology Description ignores context.forward
Tomasz Kaszuba created KAFKA-13963: -- Summary: Topology Description ignores context.forward Key: KAFKA-13963 URL: https://issues.apache.org/jira/browse/KAFKA-13963 Project: Kafka Issue Type: Bug Affects Versions: 2.7.2 Reporter: Tomasz Kaszuba I have a simple topology: {code:java} val topology = new Topology topology .addSource("source", Serdes.stringSerde.deserializer, Serdes.stringSerde.deserializer, inputTopic) .addProcessor( "process", new ProcessorSupplier[String, String] { override def get(): Processor[String, String] = new RecordCollectorProcessor() }, "source" ) {code} And a simple processor that uses context.forward to forward messages: {code:java} private class ContextForwardProcessor extends AbstractProcessor[String, String]() { override def process(key: String, value: String): Unit = context().forward("key", "value", To.child("output")) override def close(): Unit = () } {code} when I call topology.describe() I receive this: {noformat} Topologies: Sub-topology: 0 Source: source (topics: [input]) --> process Processor: process (stores: []) --> none <-- source {noformat} Ignoring the fact that this will not run since it will throw a runtime exception why is the To.child ignored? Taking it one point further if I add multiple sinks to the topology like so: {code:java} val topology = new Topology topology .addSource("source", Serdes.stringSerde.deserializer, Serdes.stringSerde.deserializer, inputTopic) .addProcessor( "process", new ProcessorSupplier[String, String] { override def get(): Processor[String, String] = new ContextForwardProcessor() }, "source" ) .addSink("sink", "output1", Serdes.stringSerde.serializer(), Serdes.stringSerde.serializer(), "process") .addSink("sink2", "output2", Serdes.stringSerde.serializer(), Serdes.stringSerde.serializer(), "process") {code} but have the processor only output to "output1" it is in no way reflected in the described topology graph. I assume this is by design since it's a lot more work to interpret what the context.forward is doing but when I tried to look for this information in the java doc I couldn't find it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Created] (KAFKA-13962) KRaft StripedReplicaPlacer should handle replicas in controlled shutdown
David Jacot created KAFKA-13962: --- Summary: KRaft StripedReplicaPlacer should handle replicas in controlled shutdown Key: KAFKA-13962 URL: https://issues.apache.org/jira/browse/KAFKA-13962 Project: Kafka Issue Type: Improvement Reporter: David Jacot Assignee: David Jacot [KIP-841|https://cwiki.apache.org/confluence/display/KAFKA/KIP-841%3A+Fenced+replicas+should+not+be+allowed+to+join+the+ISR+in+KRaft] added the in-controlled-shutdown state to the quorum controller. The StripedReplicaPlacer should be aware of them and treat them like fenced replicas (place there only as last resort). -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)
dajac commented on code in PR #12240: URL: https://github.com/apache/kafka/pull/12240#discussion_r891098455 ## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ## @@ -682,25 +683,35 @@ private ApiError createTopic(CreatableTopic topic, short replicationFactor = topic.replicationFactor() == -1 ? defaultReplicationFactor : topic.replicationFactor(); try { -List> replicas = clusterControl.replicaPlacer().place(new PlacementSpec( +List> partitions = clusterControl.replicaPlacer().place(new PlacementSpec( 0, numPartitions, replicationFactor ), clusterDescriber); -for (int partitionId = 0; partitionId < replicas.size(); partitionId++) { -int[] r = Replicas.toArray(replicas.get(partitionId)); +for (int partitionId = 0; partitionId < partitions.size(); partitionId++) { +List replicas = partitions.get(partitionId); +List isr = replicas.stream(). + filter(clusterControl::active).collect(Collectors.toList()); +// We need to have at least one replica in the ISR. +if (isr.isEmpty()) isr.add(replicas.get(0)); Review Comment: Sure. https://issues.apache.org/jira/browse/KAFKA-13962. I will do this right away. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13959) Controller should unfence Broker with busy metadata log
[ https://issues.apache.org/jira/browse/KAFKA-13959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550943#comment-17550943 ] Luke Chen commented on KAFKA-13959: --- Sorry, I didn't see your last sentence. Thanks for the investigation! Looking forward to knowing the root cause! :) > Controller should unfence Broker with busy metadata log > --- > > Key: KAFKA-13959 > URL: https://issues.apache.org/jira/browse/KAFKA-13959 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.0 >Reporter: Jose Armando Garcia Sancio >Priority: Blocker > > https://issues.apache.org/jira/browse/KAFKA-13955 showed that it is possible > for the controller to not unfence a broker if the committed offset keeps > increasing. > > One solution to this problem is to require the broker to only catch up to the > last committed offset when they last sent the heartbeat. For example: > # Broker sends a heartbeat with current offset of {{{}Y{}}}. The last commit > offset is {{{}X{}}}. The controller remember this last commit offset, call it > {{X'}} > # Broker sends another heartbeat with current offset of {{{}Z{}}}. Unfence > the broker if {{Z >= X}} or {{{}Z >= X'{}}}. > > This change should also set the default for MetadataMaxIdleIntervalMs back to > 500. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13959) Controller should unfence Broker with busy metadata log
[ https://issues.apache.org/jira/browse/KAFKA-13959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550941#comment-17550941 ] Luke Chen commented on KAFKA-13959: --- [~dengziming] , if it's 10 ms heartbeat, how could it not be able to catch up with 500ms no-op records? > Controller should unfence Broker with busy metadata log > --- > > Key: KAFKA-13959 > URL: https://issues.apache.org/jira/browse/KAFKA-13959 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.0 >Reporter: Jose Armando Garcia Sancio >Priority: Blocker > > https://issues.apache.org/jira/browse/KAFKA-13955 showed that it is possible > for the controller to not unfence a broker if the committed offset keeps > increasing. > > One solution to this problem is to require the broker to only catch up to the > last committed offset when they last sent the heartbeat. For example: > # Broker sends a heartbeat with current offset of {{{}Y{}}}. The last commit > offset is {{{}X{}}}. The controller remember this last commit offset, call it > {{X'}} > # Broker sends another heartbeat with current offset of {{{}Z{}}}. Unfence > the broker if {{Z >= X}} or {{{}Z >= X'{}}}. > > This change should also set the default for MetadataMaxIdleIntervalMs back to > 500. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] d-t-w commented on pull request #12260: MINOR: add note on IDEMPOTENT_WRITE ACL to 3.2.0 notable changes
d-t-w commented on PR #12260: URL: https://github.com/apache/kafka/pull/12260#issuecomment-1148497056 Thanks @showuon I have updated per your comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12235: KAFKA-13945: add bytes/records consumed and produced metrics
cadonna commented on code in PR #12235: URL: https://github.com/apache/kafka/pull/12235#discussion_r891054931 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java: ## @@ -114,6 +116,63 @@ public void shouldGetProcessAtSourceSensor() { verifySensor(() -> ProcessorNodeMetrics.processAtSourceSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, streamsMetrics)); } +@Test +public void shouldGetRecordsAndBytesConsumedSensor() { +final String recordsMetricNamePrefix = "records-consumed"; +final String bytesMetricNamePrefix = "bytes-consumed"; +final String descriptionOfRecordsTotal = "The total number of records consumed from this topic"; +final String descriptionOfBytesTotal = "The total number of bytes consumed from this topic"; +when(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, recordsMetricNamePrefix, RecordingLevel.INFO)) +.thenReturn(expectedSensor); +when(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, bytesMetricNamePrefix, RecordingLevel.INFO)) +.thenReturn(expectedSensor); +when(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).thenReturn(tagMap); + +final Map consumedTagMap = new HashMap<>(tagMap); +consumedTagMap.put("topic", TOPIC_NAME); +StreamsMetricsImpl.addTotalCountAndSumMetricsToSensor( Review Comment: Oh no! I just prototyped that code since it didn't make sense how the code was before with Mockito. I did also not know about closing static mocks in Mockito. I am really sorry! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soarez commented on a diff in pull request #12248: KAFKA-13958: Expose logdirs total and usable space via Kafka API (KIP…
soarez commented on code in PR #12248: URL: https://github.com/apache/kafka/pull/12248#discussion_r891031695 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -812,13 +817,16 @@ class ReplicaManager(val config: KafkaConfig, new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath) .setErrorCode(Errors.NONE.code).setTopics(topicInfos) + .setTotalBytes(totalBytes).setUsableBytes(usableBytes) case None => new DescribeLogDirsResponseData.DescribeLogDirsResult().setLogDir(absolutePath) .setErrorCode(Errors.NONE.code) + .setTotalBytes(totalBytes).setUsableBytes(usableBytes) } } catch { case e: KafkaStorageException => + e.printStackTrace() Review Comment: Is this change intended? The exception is already logged in the following line . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13959) Controller should unfence Broker with busy metadata log
[ https://issues.apache.org/jira/browse/KAFKA-13959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17550912#comment-17550912 ] dengziming commented on KAFKA-13959: When BrokerLifecycleManager is starting up, it will send heartbeat every 10 milliseconds rather than 2000 milliseconds: `scheduleNextCommunication(NANOSECONDS.convert(10, MILLISECONDS))` which is already smaller than 500ms, so the reason for this bug is more complex, I need more time to investigate. > Controller should unfence Broker with busy metadata log > --- > > Key: KAFKA-13959 > URL: https://issues.apache.org/jira/browse/KAFKA-13959 > Project: Kafka > Issue Type: Bug > Components: kraft >Affects Versions: 3.3.0 >Reporter: Jose Armando Garcia Sancio >Priority: Blocker > > https://issues.apache.org/jira/browse/KAFKA-13955 showed that it is possible > for the controller to not unfence a broker if the committed offset keeps > increasing. > > One solution to this problem is to require the broker to only catch up to the > last committed offset when they last sent the heartbeat. For example: > # Broker sends a heartbeat with current offset of {{{}Y{}}}. The last commit > offset is {{{}X{}}}. The controller remember this last commit offset, call it > {{X'}} > # Broker sends another heartbeat with current offset of {{{}Z{}}}. Unfence > the broker if {{Z >= X}} or {{{}Z >= X'{}}}. > > This change should also set the default for MetadataMaxIdleIntervalMs back to > 500. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag
dajac commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r890986175 ## clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java: ## @@ -4337,25 +4337,22 @@ public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuoru final Call call = new Call( "describeMetadataQuorum", calcDeadlineMs(now, options.timeoutMs()), provider) { -private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) { -Integer partition = 0; -String topicName = response.getTopicNameByIndex(0); -Integer leaderId = response.getPartitionLeaderId(topicName, partition); +private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) { List voters = new ArrayList<>(); List observers = new ArrayList<>(); -response.getVoterInfo(topicName, partition).forEach(v -> { +partition.currentVoters().forEach(v -> { voters.add(new QuorumInfo.ReplicaState(v.replicaId(), v.logEndOffset(), OptionalLong.of(v.lastFetchTimestamp()), OptionalLong.of(v.lastCaughtUpTimestamp(; Review Comment: When `lastFetchTimestamp` or `lastCaughtUpTimestamp` are not provided (equals to -1), don't we want to return an empty option instead of returning an option containing -1? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12206: KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag
dajac commented on code in PR #12206: URL: https://github.com/apache/kafka/pull/12206#discussion_r890984307 ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -4833,6 +4879,92 @@ public void testDescribeFeaturesFailure() { } } +@Test +public void testDescribeMetadataQuorumSuccess() throws Exception { +try (final AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(ApiKeys.DESCRIBE_QUORUM.id, Review Comment: Ack. I was not aware of this bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)
dajac commented on code in PR #12240: URL: https://github.com/apache/kafka/pull/12240#discussion_r890963791 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -485,12 +541,36 @@ Iterator usableBrokers() { id -> brokerRegistrations.get(id).rack()); } +/** + * Returns true if the broker is in fenced state; Returns false if it is + * not or if it does not exist. + */ public boolean unfenced(int brokerId) { Review Comment: That's right. However, it is used in many places in the tests. I haven't found a good way to replace it in tests that is as convenient as this predicate. I would keep it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #12180: KAFKA-13917: Avoid calling lookupCoordinator() in tight loop
viktorsomogyi commented on PR #12180: URL: https://github.com/apache/kafka/pull/12180#issuecomment-1148387148 @hachikuji would you please review this small PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)
dajac commented on code in PR #12240: URL: https://github.com/apache/kafka/pull/12240#discussion_r890922496 ## metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java: ## @@ -132,15 +143,27 @@ ClusterControlManager build() { replicaPlacer = new StripedReplicaPlacer(new Random()); } if (controllerMetrics == null) { -throw new RuntimeException("You must specify controllerMetrics"); +throw new RuntimeException("You must specify ControllerMetrics"); +} +if (featureControl == null) { +featureControl = new FeatureControlManager.Builder(). +setLogContext(logContext). +setSnapshotRegistry(snapshotRegistry). +setQuorumFeatures(new QuorumFeatures(0, new ApiVersions(), +QuorumFeatures.defaultFeatureMap(), +singletonList(0))). +setMetadataVersion(MetadataVersion.latest()). +build(); Review Comment: Yeah, I agree. Let me change that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12261: MINOR: add java 8/scala 2.12 deprecation info in doc
showuon commented on PR #12261: URL: https://github.com/apache/kafka/pull/12261#issuecomment-1148327640 @ijuma , please take a look. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request, #12261: MINOR: add java 8/scala 2.12 deprecation info in doc
showuon opened a new pull request, #12261: URL: https://github.com/apache/kafka/pull/12261 We've deprecated java 8 and scala 2.12 in KIP-750 and KIP-751 since v3.0. We should add a note in notable changes in v3.0. And also update the `Java version` in doc. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)
dajac commented on code in PR #12240: URL: https://github.com/apache/kafka/pull/12240#discussion_r890873379 ## metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java: ## @@ -213,12 +230,30 @@ public String toString() { bld.append("}"); bld.append(", rack=").append(rack); bld.append(", fenced=").append(fenced); +bld.append(", inControlledShutdown=").append(inControlledShutdown); bld.append(")"); return bld.toString(); } -public BrokerRegistration cloneWithFencing(boolean fencing) { -return new BrokerRegistration(id, epoch, incarnationId, listeners, -supportedFeatures, rack, fencing); +public Optional maybeCloneWith( +Optional fencingChange, +Optional inControlledShutdownChange +) { +boolean newFenced = fencingChange.orElse(fenced); +boolean newInControlledShutdownChange = inControlledShutdownChange.orElse(inControlledShutdown); + +if (newFenced == fenced && newInControlledShutdownChange == inControlledShutdown) +return Optional.empty(); Review Comment: Yeah, you're right. I am not sure why I use this Optional here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12240: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 1)
dajac commented on code in PR #12240: URL: https://github.com/apache/kafka/pull/12240#discussion_r890868519 ## metadata/src/main/java/org/apache/kafka/image/MetadataImage.java: ## @@ -120,10 +121,16 @@ public AclsImage acls() { } public void write(Consumer> out) { +// We use the minimum KRaft metadata version if this image does +// not have a specific version set. +MetadataVersion metadataVersion = features.metadataVersion(); +if (metadataVersion.equals(MetadataVersion.UNINITIALIZED)) { +metadataVersion = MetadataVersion.IBP_3_0_IV1; +} Review Comment: @mumrah is removing `UNINITIALIZED` in https://github.com/apache/kafka/pull/12250. We can remove this logic afterwards. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #11778: KAFKA-10000: Use transactional producer for config topic (KIP-618)
showuon merged PR #11778: URL: https://github.com/apache/kafka/pull/11778 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11778: KAFKA-10000: Use transactional producer for config topic (KIP-618)
showuon commented on PR #11778: URL: https://github.com/apache/kafka/pull/11778#issuecomment-1148284973 Failed tests are unrelated: ``` Build / JDK 8 and Scala 2.12 / kafka.api.ConsumerBounceTest.testConsumerReceivesFatalExceptionWhenGroupPassesMaxSize() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
C0urante commented on PR #11779: URL: https://github.com/apache/kafka/pull/11779#issuecomment-1148257156 Thanks Tom, some great catches. Going to rebase tomorrow or Thursday which should address the one or two outstanding comments; everything else should be addressed now and ready for another round. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
C0urante commented on code in PR #11779: URL: https://github.com/apache/kafka/pull/11779#discussion_r890817727 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection connectors, Collection callback) { +if (internalRequestValidationEnabled()) { +ConnectRestException requestValidationError = null; +if (requestSignature == null) { +requestValidationError = new BadRequestException("Internal request missing required signature"); +} else if (!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) { +requestValidationError = new BadRequestException(String.format( +"This worker does not support the '%s' key signing algorithm used by other workers. " ++ "This worker is currently configured to use: %s. " ++ "Check that all workers' configuration files permit the same set of signature algorithms, " ++ "and correct any misconfigured worker and restart it.", +requestSignature.keyAlgorithm(), +keySignatureVerificationAlgorithms +)); +} else { +if (!requestSignature.isValid(sessionKey)) { +requestValidationError = new ConnectRestException( +Response.Status.FORBIDDEN, +"Internal request contained invalid signature." +); +} +} +if (requestValidationError != null) { +callback.onCompletion(requestValidationError, null); +return true; +} +} + +return false; +} + +/** + * Represents an active zombie fencing: that is, an in-progress attempt to invoke + * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, write a new task count + * record to the config topic. + */ +class ZombieFencing { +private final String connName; +private final int tasksToRecord; +private final int taskGen; +private final FutureCallback fencingFollowup; +private final KafkaFuture fencingFuture; + +public ZombieFencing(String connName, int tasksToFence, int tasksToRecord, int taskGen) { +this.connName = connName; +this.tasksToRecord = tasksToRecord; +this.taskGen = taskGen; +this.fencingFollowup = new FutureCallback<>(); +this.fencingFuture = worker.fenceZombies(connName, tasksToFence, configState.connectorConfig(connName)).thenApply(ignored -> { +// This callback will be called on the same thread that invokes KafkaFuture::thenApply if +// the future is already completed. Since that thread is the herder tick thread, we don't need +// to perform follow-up logic through an additional herder request (and if we tried, it would lead +// to deadlock) +addOrRunRequest( +this::onZombieFencingSuccess, +fencingFollowup +); +awaitFollowup(); +return null; +}); +} + +// Invoked after the worker has successfully fenced out the producers of old task generations using an admin client +// Note that work here will be performed on the herder's tick thread, so it should not block for very long +private Void onZombieFencingSuccess() throws TimeoutException { +configBackingStore.refresh(1, TimeUnit.MINUTES); +configState = configBackingStore.snapshot(); +if (taskGen < configState.taskConfigGeneration(connName)) { +throw new ConnectRestException( +Response.Status.CONFLICT.getStatusCode(), +"Fencing failed because new task configurations were generated for the connector"); +} +if (!writeToConfigTopicAsLeader(() -> configBackingStore.putTaskCountRecord(connName, tasksToRecord))) { Review Comment: Good point, replaced `configLog.readToEnd().get()` with `configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS)`, which is used everywhere else in the `KafkaConfigBackingStore` where we read to the end of the log to ensure that writes that we just performed have landed. It comes with the downside that it makes zombie fencing rounds more frail, but that's better than squatting indefinitely on the herder thread. I also fixed another potential blocking issue around this area by shifting the call to `onZombieFencingSuccess` (or rather, the registration of it as a follow-up to the future returned by `Worker::fenceZombies`)
[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
C0urante commented on code in PR #11779: URL: https://github.com/apache/kafka/pull/11779#discussion_r890817572 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection connectors, Collection callback) { +if (internalRequestValidationEnabled()) { +ConnectRestException requestValidationError = null; +if (requestSignature == null) { +requestValidationError = new BadRequestException("Internal request missing required signature"); +} else if (!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) { +requestValidationError = new BadRequestException(String.format( +"This worker does not support the '%s' key signing algorithm used by other workers. " ++ "This worker is currently configured to use: %s. " ++ "Check that all workers' configuration files permit the same set of signature algorithms, " ++ "and correct any misconfigured worker and restart it.", +requestSignature.keyAlgorithm(), +keySignatureVerificationAlgorithms +)); +} else { +if (!requestSignature.isValid(sessionKey)) { +requestValidationError = new ConnectRestException( +Response.Status.FORBIDDEN, +"Internal request contained invalid signature." +); +} +} +if (requestValidationError != null) { +callback.onCompletion(requestValidationError, null); +return true; +} +} + +return false; +} + +/** + * Represents an active zombie fencing: that is, an in-progress attempt to invoke + * {@link Worker#fenceZombies(String, int, Map)} and then, if successful, write a new task count + * record to the config topic. + */ +class ZombieFencing { +private final String connName; +private final int tasksToRecord; +private final int taskGen; +private final FutureCallback fencingFollowup; +private final KafkaFuture fencingFuture; + +public ZombieFencing(String connName, int tasksToFence, int tasksToRecord, int taskGen) { +this.connName = connName; +this.tasksToRecord = tasksToRecord; +this.taskGen = taskGen; +this.fencingFollowup = new FutureCallback<>(); +this.fencingFuture = worker.fenceZombies(connName, tasksToFence, configState.connectorConfig(connName)).thenApply(ignored -> { +// This callback will be called on the same thread that invokes KafkaFuture::thenApply if +// the future is already completed. Since that thread is the herder tick thread, we don't need +// to perform follow-up logic through an additional herder request (and if we tried, it would lead +// to deadlock) +addOrRunRequest( +this::onZombieFencingSuccess, +fencingFollowup +); +awaitFollowup(); +return null; +}); +} + +// Invoked after the worker has successfully fenced out the producers of old task generations using an admin client +// Note that work here will be performed on the herder's tick thread, so it should not block for very long +private Void onZombieFencingSuccess() throws TimeoutException { +configBackingStore.refresh(1, TimeUnit.MINUTES); +configState = configBackingStore.snapshot(); +if (taskGen < configState.taskConfigGeneration(connName)) { +throw new ConnectRestException( +Response.Status.CONFLICT.getStatusCode(), +"Fencing failed because new task configurations were generated for the connector"); +} +if (!writeToConfigTopicAsLeader(() -> configBackingStore.putTaskCountRecord(connName, tasksToRecord))) { +throw new ConnectException("Failed to write connector task count record to config topic since worker was fenced out"); +} Review Comment: I pushed a change to https://github.com/apache/kafka/pull/11778 that basically does this; will rebase and update the new config topic writes introduced in this PR accordingly. One noteworthy difference now is that the exception message is always the same regardless of which operation failed; I tried to make it generic and user-friendly enough to work with that, but if that doesn't work well enough, we can add a message parameter to this method and use it as part of the message for the exception
[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
C0urante commented on code in PR #11779: URL: https://github.com/apache/kafka/pull/11779#discussion_r890817431 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1745,6 +1958,23 @@ private boolean checkRebalanceNeeded(Callback callback) { return false; } +/** + * Execute the given action and subsequent callback immediately if the current thread is the herder's tick thread, + * or use them to create and store a {@link DistributedHerderRequest} on the request queue and return the resulting request + * if not. + * @param action the action that should be run on the herder's tick thread + * @param callback the callback that should be invoked once the action is complete + * @return a new {@link DistributedHerderRequest} if one has been created and added to the request queue, and {@code null} otherwise + */ +DistributedHerderRequest addOrRunRequest(Callable action, Callback callback) { Review Comment: That works, yeah ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection connectors, Collection callback) { +if (internalRequestValidationEnabled()) { +ConnectRestException requestValidationError = null; +if (requestSignature == null) { +requestValidationError = new BadRequestException("Internal request missing required signature"); +} else if (!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) { +requestValidationError = new BadRequestException(String.format( +"This worker does not support the '%s' key signing algorithm used by other workers. " ++ "This worker is currently configured to use: %s. " ++ "Check that all workers' configuration files permit the same set of signature algorithms, " ++ "and correct any misconfigured worker and restart it.", +requestSignature.keyAlgorithm(), +keySignatureVerificationAlgorithms +)); +} else { +if (!requestSignature.isValid(sessionKey)) { +requestValidationError = new ConnectRestException( +Response.Status.FORBIDDEN, +"Internal request contained invalid signature." +); +} +} +if (requestValidationError != null) { +callback.onCompletion(requestValidationError, null); Review Comment: This follows the same pattern as [AbstractHerder::maybeAddConfigErrors](https://github.com/apache/kafka/blob/09570f2540269cc1196c4c69cc7997d035159d1d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L711), which accepts a `Callback` but only invokes it on errors. This is useful if you'd like to establish some reusable logic that terminates control flow for a method and reports an error to a callback if something goes wrong, but otherwise allows control flow to continue and possibly fail later. I'll take a page out of `AbstractHerder::maybeAddConfigErrors`'s book and add Javadocs making note of this fact. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -2082,6 +2343,117 @@ private void resetActiveTopics(Collection connectors, Collection callback) { +if (internalRequestValidationEnabled()) { +ConnectRestException requestValidationError = null; +if (requestSignature == null) { +requestValidationError = new BadRequestException("Internal request missing required signature"); +} else if (!keySignatureVerificationAlgorithms.contains(requestSignature.keyAlgorithm())) { +requestValidationError = new BadRequestException(String.format( +"This worker does not support the '%s' key signing algorithm used by other workers. " ++ "This worker is currently configured to use: %s. " ++ "Check that all workers' configuration files permit the same set of signature algorithms, " ++ "and correct any misconfigured worker and restart it.", +requestSignature.keyAlgorithm(), +keySignatureVerificationAlgorithms +)); +} else { +if (!requestSignature.isValid(sessionKey)) { +requestValidationError = new ConnectRestException( +Response.Status.FORBIDDEN, +"Internal request contained invalid
[GitHub] [kafka] C0urante commented on a diff in pull request #11779: KAFKA-10000: Zombie fencing (KIP-618)
C0urante commented on code in PR #11779: URL: https://github.com/apache/kafka/pull/11779#discussion_r890817292 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -582,6 +588,59 @@ public boolean startTask( } } +/** + * Using the admin principal for this connector, perform a round of zombie fencing that disables transactional producers + * for the specified number of source tasks from sending any more records. + * @param connName the name of the connector + * @param numTasks the number of tasks to fence out + * @param connProps the configuration of the connector; may not be null + * @return a {@link KafkaFuture} that will complete when the producers have all been fenced out, or the attempt has failed + */ +public KafkaFuture fenceZombies(String connName, int numTasks, Map connProps) { +return fenceZombies(connName, numTasks, connProps, Admin::create); +} + +// Allows us to mock out the Admin client for testing +KafkaFuture fenceZombies(String connName, int numTasks, Map connProps, Function, Admin> adminFactory) { +log.debug("Fencing out {} task producers for source connector {}", numTasks, connName); +try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) { +ClassLoader savedLoader = plugins.currentThreadLoader(); +try { +String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); +ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType); +savedLoader = Plugins.compareAndSwapLoaders(connectorLoader); +final SourceConnectorConfig connConfig = new SourceConnectorConfig(plugins, connProps, config.topicCreationEnable()); +final Class connClass = plugins.connectorClass( + connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG)); + +Map adminConfig = adminConfigs( +connName, +"connector-worker-adminclient-" + connName, +config, +connConfig, +connClass, +connectorClientConfigOverridePolicy, +kafkaClusterId, +ConnectorType.SOURCE); +Admin admin = adminFactory.apply(adminConfig); Review Comment: Good catch, done. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -582,6 +588,59 @@ public boolean startTask( } } +/** + * Using the admin principal for this connector, perform a round of zombie fencing that disables transactional producers + * for the specified number of source tasks from sending any more records. + * @param connName the name of the connector + * @param numTasks the number of tasks to fence out + * @param connProps the configuration of the connector; may not be null + * @return a {@link KafkaFuture} that will complete when the producers have all been fenced out, or the attempt has failed + */ +public KafkaFuture fenceZombies(String connName, int numTasks, Map connProps) { +return fenceZombies(connName, numTasks, connProps, Admin::create); +} + +// Allows us to mock out the Admin client for testing +KafkaFuture fenceZombies(String connName, int numTasks, Map connProps, Function, Admin> adminFactory) { +log.debug("Fencing out {} task producers for source connector {}", numTasks, connName); +try (LoggingContext loggingContext = LoggingContext.forConnector(connName)) { +ClassLoader savedLoader = plugins.currentThreadLoader(); +try { +String connType = connProps.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); +ClassLoader connectorLoader = plugins.delegatingLoader().connectorLoader(connType); +savedLoader = Plugins.compareAndSwapLoaders(connectorLoader); Review Comment: Ah yeah, been toying with that idea for a while but never got around to trying it out. Works pretty well in this case; the one wrinkle is that the signature for `AutoCloseable::close` includes a checked exception. I've added a new (internal) `LoaderSwap` class that implements `AutoCloseable` and removes that checked exception to address that. If this looks good, we can retrofit other parts of the code base to leverage it in a follow-up. ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java: ## @@ -138,6 +138,17 @@ public interface Herder { */ void putTaskConfigs(String connName, List> configs, Callback callback, InternalRequestSignature requestSignature); +/** + * Fence out any older task generations for a source connector, and then write a record to the config topic + * indicating