[GitHub] [kafka] ijuma commented on a diff in pull request #12498: KAFKA-13986; Brokers should include node.id in fetches to metadata quorum
ijuma commented on code in PR #12498: URL: https://github.com/apache/kafka/pull/12498#discussion_r942043740 ## core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala: ## @@ -84,21 +84,21 @@ class RaftManagerTest { @Test def testSentinelNodeIdIfBrokerRoleOnly(): Unit = { Review Comment: Does the test name need to change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14155) Kafka Stream - State transition from RUNNING to ERROR
Harsha Nadig created KAFKA-14155: Summary: Kafka Stream - State transition from RUNNING to ERROR Key: KAFKA-14155 URL: https://issues.apache.org/jira/browse/KAFKA-14155 Project: Kafka Issue Type: Bug Reporter: Harsha Nadig 2022-08-09 06:05:17 [-StreamThread-1] INFO c.g.o.d.k.c.KafkaStreamsConfig.lambda$null$0[82] State transition from RUNNING to ERROR 2022-08-09 06:05:17 [-StreamThread-1] ERROR o.a.k.streams.KafkaStreams.maybeSetError[443] stream-client [] All stream threads have died. The instance will be in error state and should be closed. 2022-08-09 06:05:17 [] INFO o.a.k.s.p.i.StreamThread.completeShutdown[935] stream-thread [-StreamThread-1] Shutdown complete 2022-08-09 06:05:17 [-StreamThread-1] ERROR c.g.o.d.k.c.KafkaStreamsConfig.uncaughtException[88] StreamsThread threadId: -StreamThread-1 TaskManager MetadataState: Tasks: throws exception: org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=12_1, processor=KSTREAM-SOURCE-, topic=, partition=1, offset=, stacktrace=java.lang.NullPointerException at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:696) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1033) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:690) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510) Caused by: java.lang.NullPointerException: null -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] andymg3 commented on a diff in pull request #12499: KAFKA-14154; Ensure AlterPartition not sent to stale controller
andymg3 commented on code in PR #12499: URL: https://github.com/apache/kafka/pull/12499#discussion_r941967874 ## core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala: ## @@ -364,15 +423,15 @@ class BrokerToControllerRequestThread( } override def doWork(): Unit = { -if (activeControllerAddress().isDefined) { +if (activeControllerOpt().isDefined) { super.pollOnce(Long.MaxValue) } else { debug("Controller isn't cached, looking for local metadata changes") controllerNodeProvider.get() match { -case Some(controllerNode) => - info(s"Recorded new controller, from now on will use broker $controllerNode") - updateControllerAddress(controllerNode) - metadataUpdater.setNodes(Seq(controllerNode).asJava) +case Some(controllerNodeAndEpoch) => Review Comment: Is this where/how eventually the `LeaderAndIsr` from the new controller change gets applied? ## core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala: ## @@ -364,15 +423,15 @@ class BrokerToControllerRequestThread( } override def doWork(): Unit = { -if (activeControllerAddress().isDefined) { +if (activeControllerOpt().isDefined) { super.pollOnce(Long.MaxValue) } else { debug("Controller isn't cached, looking for local metadata changes") controllerNodeProvider.get() match { -case Some(controllerNode) => - info(s"Recorded new controller, from now on will use broker $controllerNode") - updateControllerAddress(controllerNode) - metadataUpdater.setNodes(Seq(controllerNode).asJava) +case Some(controllerNodeAndEpoch) => Review Comment: Is this where/how eventually the `LeaderAndIsr` from the new controllee gets applied? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] andymg3 commented on a diff in pull request #12499: KAFKA-14154; Ensure AlterPartition not sent to stale controller
andymg3 commented on code in PR #12499: URL: https://github.com/apache/kafka/pull/12499#discussion_r941967046 ## core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala: ## @@ -317,21 +373,24 @@ class BrokerToControllerRequestThread( override def generateRequests(): Iterable[RequestAndCompletionHandler] = { val currentTimeMs = time.milliseconds() val requestIter = requestQueue.iterator() +val controllerOpt = activeControllerOpt() + while (requestIter.hasNext) { val request = requestIter.next if (currentTimeMs - request.createdTimeMs >= retryTimeoutMs) { requestIter.remove() request.callback.onTimeout() } else { -val controllerAddress = activeControllerAddress() -if (controllerAddress.isDefined) { - requestIter.remove() - return Some(RequestAndCompletionHandler( -time.milliseconds(), -controllerAddress.get, -request.request, -handleResponse(request) - )) +controllerOpt.foreach { activeController => + if (activeController.epoch >= request.minControllerEpoch) { Review Comment: To confirm, this check is done on the broker side right? I guess you sort of allude to this in the PR description that potentially a more ideal solution would be for the controller to do the check server side, but that would require a version bump. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request, #12499: KAFKA-14154; Ensure AlterPartition not sent to stale controller
hachikuji opened a new pull request, #12499: URL: https://github.com/apache/kafka/pull/12499 It is possible currently for a leader to send an `AlterPartition` request to a stale controller which does not have the latest leader epoch discovered through a `LeaderAndIsr` request. In this case, the stale controller returns `FENCED_LEADER_EPOCH`, which causes the partition leader to get stuck. This is a change in behavior following https://github.com/apache/kafka/pull/12032. Prior to that patch, the request would either be accepted (potentially incorrectly) if the `LeaderAndIsr` state matched that on the controller, or it would have returned `NOT_CONTROLLER`. This patch fixes the problem by ensuring that `AlterPartition` is sent to a controller with an epoch which is at least as large as that of the controller which sent the `LeaderAndIsr` request. This ensures that the `FENCED_LEADER_EPOCH` error from the controller can be trusted. A more elegant solution to this problem would probably be to include the controller epoch in the `AlterPartition` request, but this would require a version bump. Alternatively, we considered letting the controller return `UNKNOWN_LEADER_EPOCH` instead of `FENCED_LEADER_EPOCH` when the epoch is larger than what it has in its context. This too likely would require a version bump. Finally, we considered reverting https://github.com/apache/kafka/pull/12032, which would restore the looser validation logic which allows the controller to accept `AlterPartition` requests with larger leader epochs. We rejected this option because we feel it can lead to correctness violations. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14081) Cannot get my MetricsReporter implementation to receive meaningful metrics
[ https://issues.apache.org/jira/browse/KAFKA-14081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17577677#comment-17577677 ] Rens Groothuijsen commented on KAFKA-14081: --- I had a look at the source code, and it does appear that {{metricChange()}} is only being called when adding a metric, not when a metric changes. Given that the documentation of {{metricChange()}} says otherwise, I assume this was accidentally removed at some point. > Cannot get my MetricsReporter implementation to receive meaningful metrics > -- > > Key: KAFKA-14081 > URL: https://issues.apache.org/jira/browse/KAFKA-14081 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.3.0 >Reporter: Gian Luca >Priority: Minor > > I want to extract metrics from KafkaProducer to export them to our company > monitoring solution. At first I went for implementing {{MetricsReporter}} and > registering my implementation through the "metric.reporters" config property. > The class is correctly registered as it receives metric updates through > {{metricChange()}} while KafkaProducer is being used. The problem is that all > the metric values are stuck at zero (NaN in older versions of Kafka), even > the most trivial (e.g. 'record-send-total'). > If instead of using a report I simply poll the {{metrics()}} method of the > KafkaProducer, then I see meaningful values: counters increasing over time, > etc. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14154) Persistent URP after controller soft failure
[ https://issues.apache.org/jira/browse/KAFKA-14154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14154: Description: We ran into a scenario where a partition leader was unable to expand the ISR after a soft controller failover. Here is what happened: Initial state: leader=1, isr=[1,2], leader epoch=10. Broker 1 is acting as the current controller. 1. Broker 1 loses its session in Zookeeper. 2. Broker 2 becomes the new controller. 3. During initialization, controller 2 removes 1 from the ISR. So state is updated: leader=2, isr=[2], leader epoch=11. 4. Broker 2 receives `LeaderAndIsr` from the new controller with leader epoch=11. 5. Broker 2 immediately tries to add replica 1 back to the ISR since it is still fetching and is caught up. However, the `BrokerToControllerChannelManager` is still pointed at controller 1, so that is where the `AlterPartition` is sent. 6. Controller 1 does not yet realize that it is not the controller, so it processes the `AlterPartition` request. It sees the leader epoch of 11, which is higher than what it has in its own context. Following changes to the `AlterPartition` validation in [https://github.com/apache/kafka/pull/12032/files,] the controller returns FENCED_LEADER_EPOCH. 7. After receiving the FENCED_LEADER_EPOCH from the old controller, the leader is stuck because it assumes that the error implies that another LeaderAndIsr request should be sent. Prior to [https://github.com/apache/kafka/pull/12032/files|https://github.com/apache/kafka/pull/12032/files,], the way we handled this case was a little different. We only verified that the leader epoch in the request was at least as large as the current epoch in the controller context. Anything higher was accepted. The controller would have attempted to write the updated state to Zookeeper. This update would have failed because of the controller epoch check, however, we would have returned NOT_CONTROLLER in this case, which is handled in `AlterPartitionManager`. It is tempting to revert the logic, but the risk is in the idempotency check: [https://github.com/apache/kafka/pull/12032/files#diff-3e042c962e80577a4cc9bbcccf0950651c6b312097a86164af50003c00c50d37L2369.] If the AlterPartition request happened to match the state inside the old controller, the controller would consider the update successful and return no error. But if its state was already stale at that point, then that might cause the leader to incorrectly assume that the state had been updated. One way to fix this problem without weakening the validation is to rely on the controller epoch in `AlterPartitionManager`. When we discover a new controller, we also discover its epoch, so we can pass that through. The `LeaderAndIsr` request already includes the controller epoch of the controller that sent it and we already propagate this through to `AlterPartition.submit`. Hence all we need to do is verify that the epoch of the current controller target is at least as large as the one discovered through the `LeaderAndIsr`. was: We ran into a scenario where a partition leader was unable to expand the ISR after a soft controller failover. Here is what happened: Initial state: leader=1, isr=[1,2], leader epoch=10. Broker 1 is acting as the current controller. 1. Broker 1 loses its session in Zookeeper. 2. Broker 2 becomes the new controller. 3. During initialization, controller 2 removes 1 from the ISR. So state is updated: leader=2, isr=[1, 2], leader epoch=11. 4. Broker 2 receives `LeaderAndIsr` from the new controller with leader epoch=11. 5. Broker 2 immediately tries to add replica 1 back to the ISR since it is still fetching and is caught up. However, the `BrokerToControllerChannelManager` is still pointed at controller 1, so that is where the `AlterPartition` is sent. 6. Controller 1 does not yet realize that it is not the controller, so it processes the `AlterPartition` request. It sees the leader epoch of 11, which is higher than what it has in its own context. Following changes to the `AlterPartition` validation in [https://github.com/apache/kafka/pull/12032/files,] the controller returns FENCED_LEADER_EPOCH. 7. After receiving the FENCED_LEADER_EPOCH from the old controller, the leader is stuck because it assumes that the error implies that another LeaderAndIsr request should be sent. Prior to [https://github.com/apache/kafka/pull/12032/files|https://github.com/apache/kafka/pull/12032/files,], the way we handled this case was a little different. We only verified that the leader epoch in the request was at least as large as the current epoch in the controller context. Anything higher was accepted. The controller would have attempted to write the updated state to Zookeeper. This update would have failed because of the controller epoch check, however, we would have returned NOT_CONTROLLER
[GitHub] [kafka] guozhangwang commented on pull request #12497: KAFKA-10199: Expose read only task from state updater
guozhangwang commented on PR #12497: URL: https://github.com/apache/kafka/pull/12497#issuecomment-1210002259 The failures in `DefaultStateUpdaterTest` seems relevant? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14154) Persistent URP after controller soft failure
[ https://issues.apache.org/jira/browse/KAFKA-14154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14154: Fix Version/s: 3.3.0 > Persistent URP after controller soft failure > > > Key: KAFKA-14154 > URL: https://issues.apache.org/jira/browse/KAFKA-14154 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 3.3.0 > > > We ran into a scenario where a partition leader was unable to expand the ISR > after a soft controller failover. Here is what happened: > Initial state: leader=1, isr=[1,2], leader epoch=10. Broker 1 is acting as > the current controller. > 1. Broker 1 loses its session in Zookeeper. > 2. Broker 2 becomes the new controller. > 3. During initialization, controller 2 removes 1 from the ISR. So state is > updated: leader=2, isr=[1, 2], leader epoch=11. > 4. Broker 2 receives `LeaderAndIsr` from the new controller with leader > epoch=11. > 5. Broker 2 immediately tries to add replica 1 back to the ISR since it is > still fetching and is caught up. However, the > `BrokerToControllerChannelManager` is still pointed at controller 1, so that > is where the `AlterPartition` is sent. > 6. Controller 1 does not yet realize that it is not the controller, so it > processes the `AlterPartition` request. It sees the leader epoch of 11, which > is higher than what it has in its own context. Following changes to the > `AlterPartition` validation in > [https://github.com/apache/kafka/pull/12032/files,] the controller returns > FENCED_LEADER_EPOCH. > 7. After receiving the FENCED_LEADER_EPOCH from the old controller, the > leader is stuck because it assumes that the error implies that another > LeaderAndIsr request should be sent. > Prior to > [https://github.com/apache/kafka/pull/12032/files|https://github.com/apache/kafka/pull/12032/files,], > the way we handled this case was a little different. We only verified that > the leader epoch in the request was at least as large as the current epoch in > the controller context. Anything higher was accepted. The controller would > have attempted to write the updated state to Zookeeper. This update would > have failed because of the controller epoch check, however, we would have > returned NOT_CONTROLLER in this case, which is handled in > `AlterPartitionManager`. > It is tempting to revert the logic, but the risk is in the idempotency check: > [https://github.com/apache/kafka/pull/12032/files#diff-3e042c962e80577a4cc9bbcccf0950651c6b312097a86164af50003c00c50d37L2369.] > If the AlterPartition request happened to match the state inside the old > controller, the controller would consider the update successful and return no > error. But if its state was already stale at that point, then that might > cause the leader to incorrectly assume that the state had been updated. > One way to fix this problem without weakening the validation is to rely on > the controller epoch in `AlterPartitionManager`. When we discover a new > controller, we also discover its epoch, so we can pass that through. The > `LeaderAndIsr` request already includes the controller epoch of the > controller that sent it and we already propagate this through to > `AlterPartition.submit`. Hence all we need to do is verify that the epoch of > the current controller target is at least as large as the one discovered > through the `LeaderAndIsr`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14154) Persistent URP after controller soft failure
Jason Gustafson created KAFKA-14154: --- Summary: Persistent URP after controller soft failure Key: KAFKA-14154 URL: https://issues.apache.org/jira/browse/KAFKA-14154 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson We ran into a scenario where a partition leader was unable to expand the ISR after a soft controller failover. Here is what happened: Initial state: leader=1, isr=[1,2], leader epoch=10. Broker 1 is acting as the current controller. 1. Broker 1 loses its session in Zookeeper. 2. Broker 2 becomes the new controller. 3. During initialization, controller 2 removes 1 from the ISR. So state is updated: leader=2, isr=[1, 2], leader epoch=11. 4. Broker 2 receives `LeaderAndIsr` from the new controller with leader epoch=11. 5. Broker 2 immediately tries to add replica 1 back to the ISR since it is still fetching and is caught up. However, the `BrokerToControllerChannelManager` is still pointed at controller 1, so that is where the `AlterPartition` is sent. 6. Controller 1 does not yet realize that it is not the controller, so it processes the `AlterPartition` request. It sees the leader epoch of 11, which is higher than what it has in its own context. Following changes to the `AlterPartition` validation in [https://github.com/apache/kafka/pull/12032/files,] the controller returns FENCED_LEADER_EPOCH. 7. After receiving the FENCED_LEADER_EPOCH from the old controller, the leader is stuck because it assumes that the error implies that another LeaderAndIsr request should be sent. Prior to [https://github.com/apache/kafka/pull/12032/files|https://github.com/apache/kafka/pull/12032/files,], the way we handled this case was a little different. We only verified that the leader epoch in the request was at least as large as the current epoch in the controller context. Anything higher was accepted. The controller would have attempted to write the updated state to Zookeeper. This update would have failed because of the controller epoch check, however, we would have returned NOT_CONTROLLER in this case, which is handled in `AlterPartitionManager`. It is tempting to revert the logic, but the risk is in the idempotency check: [https://github.com/apache/kafka/pull/12032/files#diff-3e042c962e80577a4cc9bbcccf0950651c6b312097a86164af50003c00c50d37L2369.] If the AlterPartition request happened to match the state inside the old controller, the controller would consider the update successful and return no error. But if its state was already stale at that point, then that might cause the leader to incorrectly assume that the state had been updated. One way to fix this problem without weakening the validation is to rely on the controller epoch in `AlterPartitionManager`. When we discover a new controller, we also discover its epoch, so we can pass that through. The `LeaderAndIsr` request already includes the controller epoch of the controller that sent it and we already propagate this through to `AlterPartition.submit`. Hence all we need to do is verify that the epoch of the current controller target is at least as large as the one discovered through the `LeaderAndIsr`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ahuang98 commented on a diff in pull request #12479: MINOR; Convert some integration tests to run with the KRaft modes
ahuang98 commented on code in PR #12479: URL: https://github.com/apache/kafka/pull/12479#discussion_r941879338 ## core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala: ## @@ -20,23 +20,37 @@ import java.util import java.util.concurrent.ExecutionException import java.util.{Collections, Optional, Properties} -import scala.collection.Seq -import kafka.log.UnifiedLog -import kafka.zk.TopicPartitionZNode -import kafka.utils.TestUtils -import kafka.server.{KafkaConfig, KafkaServer, QuorumTestHarness} -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, Test} import kafka.common.TopicAlreadyMarkedForDeletionException import kafka.controller.{OfflineReplica, PartitionAndReplica, ReplicaAssignment, ReplicaDeletionSuccessful} +import kafka.log.UnifiedLog +import kafka.server +import kafka.server.{KafkaBroker, KafkaConfig, KafkaServer, QuorumTestHarness} +import kafka.utils.Implicits.PropertiesOps +import kafka.utils.{TestInfoUtils, TestUtils} +import kafka.zk.TopicPartitionZNode import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, NewPartitionReassignment, NewPartitions} -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{TopicPartition, TopicPartitionInfo} import org.apache.kafka.common.errors.UnknownTopicOrPartitionException +import org.apache.kafka.common.network.{ListenerName, Mode} +import org.apache.kafka.common.security.auth.SecurityProtocol +import org.junit.jupiter.api.Assertions.{assertTrue, _} +import org.junit.jupiter.api.function.Executable +import org.junit.jupiter.api.{AfterEach, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.ValueSource +import org.opentest4j.AssertionFailedError + +import scala.collection.{Map, Seq} import scala.jdk.CollectionConverters._ class DeleteTopicTest extends QuorumTestHarness { - var servers: Seq[KafkaServer] = Seq() + var brokers: Seq[KafkaBroker] = Seq() Review Comment: I thought it would be better to bring up brokers manually for each test since many require different sets of configs (KRaft doesn't yet support dynamic reconfiguration) and different number of brokers (we _could_ just always bring up the max required). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #12498: KAFKA-13986; Brokers should include node.id in fetches to metadata quorum
hachikuji commented on PR #12498: URL: https://github.com/apache/kafka/pull/12498#issuecomment-1209985247 cc @dengziming @niket-goel @jsancio -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request, #12498: KAFKA-13986; Brokers should include node.id in fetches to metadata quorum
hachikuji opened a new pull request, #12498: URL: https://github.com/apache/kafka/pull/12498 Currently we do not set the replicaId in fetches from brokers to the metadata quorum. It is useful to do so since that allows us to debug replication using the `DescribeQuorum` API. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12458: MINOR: Adds KRaft versions of most streams system tests
guozhangwang commented on PR #12458: URL: https://github.com/apache/kafka/pull/12458#issuecomment-1209983916 Re-triggered the jenkins build. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12458: MINOR: Adds KRaft versions of most streams system tests
guozhangwang commented on code in PR #12458: URL: https://github.com/apache/kafka/pull/12458#discussion_r941876941 ## tests/kafkatest/tests/streams/streams_broker_bounce_test.py: ## @@ -205,11 +212,17 @@ def collect_results(self, sleep_time_secs): return data @cluster(num_nodes=7) +@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], +broker_type=["leader"], +num_threads=[1, 3], +sleep_time_secs=[120], +metadata_quorum=[quorum.remote_kraft]) Review Comment: Cool, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14153) UnknownTopicOrPartitionException should include the topic/partition in the returned exception message
[ https://issues.apache.org/jira/browse/KAFKA-14153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alyssa Huang updated KAFKA-14153: - Description: Exception would be more useful if it included the topic or partition that was not found. Message right now is just `This server does not host this topic-partition.` Background: [https://github.com/apache/kafka/pull/12479#discussion_r938988993] was: Exception would be more useful if it included the topic or partition that was not found. Background: https://github.com/apache/kafka/pull/12479#discussion_r938988993 > UnknownTopicOrPartitionException should include the topic/partition in the > returned exception message > - > > Key: KAFKA-14153 > URL: https://issues.apache.org/jira/browse/KAFKA-14153 > Project: Kafka > Issue Type: Improvement >Reporter: Alyssa Huang >Priority: Minor > > Exception would be more useful if it included the topic or partition that was > not found. Message right now is just > `This server does not host this topic-partition.` > Background: [https://github.com/apache/kafka/pull/12479#discussion_r938988993] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode
jolshan commented on PR #12487: URL: https://github.com/apache/kafka/pull/12487#issuecomment-1209962842 Failed tests passed locally: [Build / JDK 8 and Scala 2.12 / kafka.admin.DeleteOffsetsConsumerGroupCommandIntegrationTest.testDeleteOffsetsNonExistingGroup()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12487/6/testReport/junit/kafka.admin/DeleteOffsetsConsumerGroupCommandIntegrationTest/Build___JDK_8_and_Scala_2_12___testDeleteOffsetsNonExistingGroup__/) [Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12487/6/testReport/junit/kafka.server/KRaftClusterTest/Build___JDK_8_and_Scala_2_12___testCreateClusterAndPerformReassignment__/) [Build / JDK 11 and Scala 2.13 / kafka.log.LogCleanerIntegrationTest.testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics()](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12487/6/testReport/junit/kafka.log/LogCleanerIntegrationTest/Build___JDK_11_and_Scala_2_13___testMarksPartitionsAsOfflineAndPopulatesUncleanableMetrics__/) [Build / JDK 17 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12487/6/testReport/junit/org.apache.kafka.connect.integration/ConnectorRestartApiIntegrationTest/Build___JDK_17_and_Scala_2_13___testMultiWorkerRestartOnlyConnector/) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #12448: KAFKA-14114: Adding Metadata Log Processing Error Related Metrics
cmccabe commented on PR #12448: URL: https://github.com/apache/kafka/pull/12448#issuecomment-1209956699 Merged from command line with the changes we discussed. Thanks, @niket-goel -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe closed pull request #12496: KAFKA-14114: Add Metadata Error Related Metrics
cmccabe closed pull request #12496: KAFKA-14114: Add Metadata Error Related Metrics URL: https://github.com/apache/kafka/pull/12496 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14153) UnknownTopicOrPartitionException should include the topic/partition in the returned exception message
Alyssa Huang created KAFKA-14153: Summary: UnknownTopicOrPartitionException should include the topic/partition in the returned exception message Key: KAFKA-14153 URL: https://issues.apache.org/jira/browse/KAFKA-14153 Project: Kafka Issue Type: Improvement Reporter: Alyssa Huang Exception would be more useful if it included the topic or partition that was not found. Background: https://github.com/apache/kafka/pull/12479#discussion_r938988993 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ahuang98 commented on a diff in pull request #12479: MINOR; Convert some integration tests to run with the KRaft modes
ahuang98 commented on code in PR #12479: URL: https://github.com/apache/kafka/pull/12479#discussion_r941848002 ## core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala: ## @@ -37,29 +43,57 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa def generateConfigs = (0 until numServers) map { node => - TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = false, rack = Some((node / 2).toString)) + TestUtils.createBrokerConfig(node, zkConnectOrNull, enableControlledShutdown = false, rack = Some((node / 2).toString)) } map (KafkaConfig.fromProps(_, overridingProps)) private val topic = "topic" - @Test - def testAutoCreateTopic(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAutoCreateTopic(quorum: String): Unit = { val producer = TestUtils.createProducer(bootstrapServers()) +val props = new Properties() +props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) +val adminClient = Admin.create(props) + +TestUtils.waitUntilTrue( + () => brokers.head.metadataCache.getAliveBrokers().size == numServers, + "Timed out waiting for all brokers to become unfenced") + try { // Send a message to auto-create the topic val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) assertEquals(0L, producer.send(record).get.offset, "Should have offset 0") - // double check that the topic is created with leader elected - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - val assignment = zkClient.getReplicaAssignmentForTopics(Set(topic)).map { case (topicPartition, replicas) => -topicPartition.partition -> replicas + val partition = adminClient.describeTopics(Collections.singleton(topic)).topicNameValues().get(topic).get(). +partitions().stream().filter(_.partition == 0).findAny() + assertTrue(partition.isPresent, "Partition [topic,0] should exist") + assertFalse(partition.get().leader().isEmpty, "Leader should exist for partition [topic,0]") + + val assignment = adminClient.describeTopics(Collections.singleton(topic)).topicNameValues.asScala.map { +case (topicName, topicDescriptionFuture) => + try topicName -> topicDescriptionFuture.get + catch { +case t: ExecutionException if t.getCause.isInstanceOf[UnknownTopicOrPartitionException] => + throw new ExecutionException( +new UnknownTopicOrPartitionException(s"Topic $topicName not found.")) Review Comment: https://issues.apache.org/jira/browse/KAFKA-14153 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14069) Allow custom configuration of foreign key join internal topics
[ https://issues.apache.org/jira/browse/KAFKA-14069?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17577634#comment-17577634 ] Guozhang Wang commented on KAFKA-14069: --- That's interesting.. could you check a few things additionally? 1) does your app commit successfully from time to time (the delete records request should be sent alined with the commit). 2) did you see the following log-lines in your logs? ``` "Sent delete-records request: {}" ``` ``` "Previous delete-records request has failed: {}. Try sending the new request now" ``` > Allow custom configuration of foreign key join internal topics > -- > > Key: KAFKA-14069 > URL: https://issues.apache.org/jira/browse/KAFKA-14069 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.8.0 >Reporter: Emmanuel Brard >Priority: Minor > > Internal topic supporting foreign key joins (-subscription-registration-topic > and -subscription-response-topic) are automatically created with_ infinite > retention_ (retention.ms=-1, retention.bytes=-1). > As far as I understand those topics are used for communication between tasks > that are involved in the FK, the intermediate result though is persisted in a > compacted topic (-subscription-store-changelog). > This means, if I understood right, that during normal operation of the stream > application, once a message is read from the registration/subscription topic, > it will not be read again, even in case of recovery (the position in those > topics is committed). > Because we have very large tables being joined this way with very high > changes frequency, we end up with FK internal topics in the order of 1 or 2 > TB. This is complicated to maintain especially in term of disk space. > I was wondering if: > - this infinite retention is really a required configuration and if not > - this infinite retention could be replaced with a configurable one (for > example of 1 week, meaning that I accept that in case of failure I must this > my app within one week) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14070) Improve documentation for queryMetadataForKey for state stores with Processor API
[ https://issues.apache.org/jira/browse/KAFKA-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17577633#comment-17577633 ] Guozhang Wang commented on KAFKA-14070: --- Hello [~balajirrao] Thanks for the updated description! That's much clearer now. Yes the `queryMetadataForKey` overloaded func without the {{StreamPartitioner}} does not work very well with the processor API since it's assuming the key of the store is inherited from the key of the input (or repartition) topic. If user is storing the key in a different manner, they'd need to use the other overloaded func that requires {{StreamPartitioner}}. But like you brought up in the second example, if the partitioning scheme is not dependent by the key (e.g. if it's by the value) then that function does not help either. I think in the near term it's definitely necessary to improve our docs clarifying that for the PAPI users --- would you like to file a PR? In the long run, we should consider generalizing this function to allow users provide any form of partitioning schemes. > Improve documentation for queryMetadataForKey for state stores with Processor > API > - > > Key: KAFKA-14070 > URL: https://issues.apache.org/jira/browse/KAFKA-14070 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 3.2.0 >Reporter: Balaji Rao >Priority: Minor > > Using {{queryMetadataForKey}} for state stores with Processor API is tricky. > One could use state stores in Processor API in ways that would make it > impossible to use {{queryMetadataForKey}} with just a key alone - one would > have to know the input record's key. This could lead to the method being > called with incorrect expectations. The documentation could be improved > around this, and around using state stores with the Processor API in general. > Example Scala snippet: > {code:scala} > val input = streamsBuilder.stream( > "input-topic", > Consumed.`with`(Serdes.intSerde, Serdes.stringSerde) > ) > private val storeBuilder = Stores > .keyValueStoreBuilder[String, String]( > Stores.inMemoryKeyValueStore("store"), > Serdes.stringSerde, > Serdes.stringSerde > ) > streamsBuilder.addStateStore(storeBuilder) > input.process( > new ProcessorSupplier[Int, String, Void, Void] { > override def get(): Processor[Int, String, Void, Void] = > new Processor[Int, String, Void, Void] { > var store: KeyValueStore[String, String] = _ > override def init(context: ProcessorContext[Void, Void]): Unit = { > super.init(context) > store = context.getStateStore("store") > } > override def process(record: Record[Int, String]): Unit = { > ('a' to 'j').foreach(x => > store.put(s"${record.key}-$x", record.value) > ) > } > } > }, > "store" > ) > {code} > In the code sample above, AFAICT there is no way the possible partition of > the {{store}} containing the key {{"1-a"}} could be determined by calling > {{queryMetadataForKey}} with the string {{{}"1-a"{}}}. One has to call > {{queryMetadataForKey}} with the record's key that produced {{{}"1-a"{}}}, in > this case the {{Int}} 1, to find the partition. > > Example 2: > The same as above, but with a different {{process}} method. > {code:scala} > override def process(record: Record[Int, String]): Unit = { > ('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}")) > }{code} > In this case the key {{"a"}} could exist in multiple partitions, with > different values in different partitions. In this case, AFAICT, one must use > {{queryMetadataForKey}} with an {{Int}} to determine the partition where a > given {{String}} would be stored. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14152) Add logic to fence kraft brokers which have fallen behind in replication
Jason Gustafson created KAFKA-14152: --- Summary: Add logic to fence kraft brokers which have fallen behind in replication Key: KAFKA-14152 URL: https://issues.apache.org/jira/browse/KAFKA-14152 Project: Kafka Issue Type: Improvement Reporter: Jason Gustafson When a kraft broker registers with the controller, it must catch up to the current metadata before it is unfenced. However, once it has been unfenced, it only needs to continue sending heartbeats to remain unfenced. It can fall arbitrarily behind in the replication of the metadata log and remain unfenced. We should consider whether there is an inverse condition that we can use to fence a broker that has fallen behind. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ahuang98 commented on a diff in pull request #12479: MINOR; Convert some integration tests to run with the KRaft modes
ahuang98 commented on code in PR #12479: URL: https://github.com/apache/kafka/pull/12479#discussion_r941771144 ## core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala: ## @@ -37,29 +43,57 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa def generateConfigs = (0 until numServers) map { node => - TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = false, rack = Some((node / 2).toString)) + TestUtils.createBrokerConfig(node, zkConnectOrNull, enableControlledShutdown = false, rack = Some((node / 2).toString)) } map (KafkaConfig.fromProps(_, overridingProps)) private val topic = "topic" - @Test - def testAutoCreateTopic(): Unit = { + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName) + @ValueSource(strings = Array("zk", "kraft")) + def testAutoCreateTopic(quorum: String): Unit = { val producer = TestUtils.createProducer(bootstrapServers()) +val props = new Properties() +props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) +val adminClient = Admin.create(props) + +TestUtils.waitUntilTrue( + () => brokers.head.metadataCache.getAliveBrokers().size == numServers, + "Timed out waiting for all brokers to become unfenced") + try { // Send a message to auto-create the topic val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) assertEquals(0L, producer.send(record).get.offset, "Should have offset 0") - // double check that the topic is created with leader elected - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - val assignment = zkClient.getReplicaAssignmentForTopics(Set(topic)).map { case (topicPartition, replicas) => -topicPartition.partition -> replicas + val partition = adminClient.describeTopics(Collections.singleton(topic)).topicNameValues().get(topic).get(). +partitions().stream().filter(_.partition == 0).findAny() + assertTrue(partition.isPresent, "Partition [topic,0] should exist") + assertFalse(partition.get().leader().isEmpty, "Leader should exist for partition [topic,0]") + + val assignment = adminClient.describeTopics(Collections.singleton(topic)).topicNameValues.asScala.map { +case (topicName, topicDescriptionFuture) => + try topicName -> topicDescriptionFuture.get + catch { +case t: ExecutionException if t.getCause.isInstanceOf[UnknownTopicOrPartitionException] => + throw new ExecutionException( +new UnknownTopicOrPartitionException(s"Topic $topicName not found.")) + } + }.flatMap { +case (_, topicDescription) => topicDescription.partitions.asScala.map { info => + (info.partition, info.replicas.asScala.map(_.id)) +} } - val brokerMetadatas = adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced) + + val brokerMetadatas = brokers.head.metadataCache.getAliveBrokers().toList val expectedMap = Map(0 -> "0", 1 -> "0", 2 -> "1", 3 -> "1") assertEquals(expectedMap, brokerMetadatas.map(b => b.id -> b.rack.get).toMap) - checkReplicaDistribution(assignment, expectedMap, numServers, numPartitions, replicationFactor) -} finally producer.close() + checkReplicaDistribution(assignment, expectedMap, numServers, numPartitions, replicationFactor, +verifyLeaderDistribution = false) Review Comment: Still investigating this one, I believe @rondagostino is taking a look -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #12486: MINOR: BrokerMetadataSnapshotter must avoid exceeding batch size
cmccabe merged PR #12486: URL: https://github.com/apache/kafka/pull/12486 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #12494: MINOR: Update site docs for ASF compliance
bbejeck commented on PR #12494: URL: https://github.com/apache/kafka/pull/12494#issuecomment-1209815424 > Should we fix the issues I reported in [apache/kafka-site#433 (comment)](https://github.com/apache/kafka-site/pull/433#issuecomment-1205049698) before merging the images back to Kafka? I've taken a stab at addressing those comments - this may be the best I can do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14138) The Exception Throwing Behavior of Transactional Producer is Inconsistent
[ https://issues.apache.org/jira/browse/KAFKA-14138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17577552#comment-17577552 ] Sagar Rao commented on KAFKA-14138: --- Thanks [~guozhang] . I went through KIP-691 and got some ideas around the expectation. Having said that, looks like the KIP isn't implemented yet and some of the new exceptions proposed aren't part of the codebase. > The Exception Throwing Behavior of Transactional Producer is Inconsistent > - > > Key: KAFKA-14138 > URL: https://issues.apache.org/jira/browse/KAFKA-14138 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Critical > > There's an issue for inconsistent error throwing inside Kafka Producer when > transactions are enabled. In short, there are two places where the received > error code from the brokers would be eventually thrown to the caller: > * Recorded on the batch's metadata, via "Sender#failBatch" > * Recorded on the txn manager, via "txnManager#handleFailedBatch". > The former would be thrown from 1) the `Future` returned from > the `send`; or 2) the `callback` inside `send(record, callback)`. Whereas, > the latter would be thrown from `producer.send()` directly in which we call > `txnManager.maybeAddPartition -> maybeFailWithError`. However, when thrown > from the former, it's not wrapped hence the direct exception (e.g. > ClusterAuthorizationException), whereas in the latter it's wrapped as, e.g. > KafkaException(ClusterAuthorizationException). And which one would be thrown > depend on a race condition since we cannot control by the time the caller > thread calls `txnManager.maybeAddPartition`, if the previous produceRequest's > error has been sent back or not. > For example consider the following sequence for idempotent producer: > 1. caller thread: within future = producer.send(), call > recordAccumulator.append > 2. sender thread: drain the accumulator, send the produceRequest and get the > error back. > 3. caller thread: within future = producer.send(), call > txnManager.maybeAddPartition, in which we would check `maybeFailWithError` > before `isTransactional`. > 4. caller thread: future.get() > In a sequence where then 3) happened before 2), we would only get the raw > exception at step 4; in a sequence where 2) happened before 3), then we would > throw the exception immediately at 3). > This inconsistent error throwing is pretty annoying for users since they'd > need to handle both cases, but many of them actually do not know this > trickiness. We should make the error throwing consistent, e.g. we should > consider: 1) which errors would be thrown from callback / future.get, and > which would be thrown from the `send` call directly, and these errors should > better be non-overlapping, 2) whether we should wrap the raw error or not, we > should do so consistently. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14151) Add validation to fail fast when base offsets are incorrectly assigned to batches
[ https://issues.apache.org/jira/browse/KAFKA-14151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruvil Shah updated KAFKA-14151: - Summary: Add validation to fail fast when base offsets are incorrectly assigned to batches (was: Add additional validation to protect on-disk log segment data from being corrupted) > Add validation to fail fast when base offsets are incorrectly assigned to > batches > - > > Key: KAFKA-14151 > URL: https://issues.apache.org/jira/browse/KAFKA-14151 > Project: Kafka > Issue Type: Improvement > Components: log >Reporter: Vincent Jiang >Priority: Major > > We saw a case where records with incorrect offsets were being written to log > segment on-disk data due to environmental issues (bug in old version JVM > JIT). We should consider adding additional validation to detect this scenario > and fail fast. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14151) Add additional validation to protect on-disk log segment data from being corrupted
[ https://issues.apache.org/jira/browse/KAFKA-14151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dhruvil Shah updated KAFKA-14151: - Description: We saw a case where records with incorrect offsets were being written to log segment on-disk data due to environmental issues (bug in old version JVM JIT). We should consider adding additional validation to detect this scenario and fail fast. (was: We received escalations reporting bad records being written to log segment on-disk data due to environmental issues (bug in old version JVM jit). We should consider adding additional validation to protect the on-disk data from being corrupted by inadvertent bugs or environmental issues) > Add additional validation to protect on-disk log segment data from being > corrupted > -- > > Key: KAFKA-14151 > URL: https://issues.apache.org/jira/browse/KAFKA-14151 > Project: Kafka > Issue Type: Improvement > Components: log >Reporter: Vincent Jiang >Priority: Major > > We saw a case where records with incorrect offsets were being written to log > segment on-disk data due to environmental issues (bug in old version JVM > JIT). We should consider adding additional validation to detect this scenario > and fail fast. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14151) Add additional validation to protect on-disk log segment data from being corrupted
[ https://issues.apache.org/jira/browse/KAFKA-14151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vincent Jiang updated KAFKA-14151: -- Priority: Major (was: Minor) > Add additional validation to protect on-disk log segment data from being > corrupted > -- > > Key: KAFKA-14151 > URL: https://issues.apache.org/jira/browse/KAFKA-14151 > Project: Kafka > Issue Type: Improvement > Components: log >Reporter: Vincent Jiang >Priority: Major > > We received escalations reporting bad records being written to log segment > on-disk data due to environmental issues (bug in old version JVM jit). We > should consider adding additional validation to protect the on-disk data > from being corrupted by inadvertent bugs or environmental issues -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14151) Add additional validation to protect on-disk log segment data from being corrupted
Vincent Jiang created KAFKA-14151: - Summary: Add additional validation to protect on-disk log segment data from being corrupted Key: KAFKA-14151 URL: https://issues.apache.org/jira/browse/KAFKA-14151 Project: Kafka Issue Type: Improvement Components: log Reporter: Vincent Jiang We received escalations reporting bad records being written to log segment on-disk data due to environmental issues (bug in old version JVM jit). We should consider adding additional validation to protect the on-disk data from being corrupted by inadvertent bugs or environmental issues -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14151) Add additional validation to protect on-disk log segment data from being corrupted
[ https://issues.apache.org/jira/browse/KAFKA-14151?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vincent Jiang updated KAFKA-14151: -- Priority: Minor (was: Major) > Add additional validation to protect on-disk log segment data from being > corrupted > -- > > Key: KAFKA-14151 > URL: https://issues.apache.org/jira/browse/KAFKA-14151 > Project: Kafka > Issue Type: Improvement > Components: log >Reporter: Vincent Jiang >Priority: Minor > > We received escalations reporting bad records being written to log segment > on-disk data due to environmental issues (bug in old version JVM jit). We > should consider adding additional validation to protect the on-disk data > from being corrupted by inadvertent bugs or environmental issues -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji commented on a diff in pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode
hachikuji commented on code in PR #12487: URL: https://github.com/apache/kafka/pull/12487#discussion_r941604866 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -881,11 +881,16 @@ class Partition(val topicPartition: TopicPartition, private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = { metadataCache match { // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are - // allowed to join the ISR. This does not apply to ZK mode. + // allowed to join the ISR. case kRaftMetadataCache: KRaftMetadataCache => !kRaftMetadataCache.isBrokerFenced(followerReplicaId) && !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId) + // In ZK mode, we just ensure the broker is alive. Although we do not check for shutting down brokers here,' Review Comment: nit: unintentional apostrophe at the end? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode
jolshan commented on code in PR #12487: URL: https://github.com/apache/kafka/pull/12487#discussion_r941598423 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition, private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = { metadataCache match { // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are - // allowed to join the ISR. This does not apply to ZK mode. + // allowed to join the ISR. In ZK mode, we just ensure the broker is alive and not shutting down. Review Comment: Ah my comment is slightly different in the latest commit. Let me know if I should change it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12469: KAFKA-13914: Add command line tool kafka-metadata-quorum.sh
hachikuji commented on code in PR #12469: URL: https://github.com/apache/kafka/pull/12469#discussion_r941548893 ## core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala: ## @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import kafka.tools.TerseFailure +import kafka.utils.Exit +import net.sourceforge.argparse4j.ArgumentParsers +import net.sourceforge.argparse4j.impl.Arguments.fileType +import net.sourceforge.argparse4j.inf.Subparsers +import org.apache.kafka.clients._ +import org.apache.kafka.clients.admin.{Admin, QuorumInfo} +import org.apache.kafka.common.utils.Utils + +import java.io.File +import java.util.Properties +import scala.jdk.CollectionConverters._ + +/** + * A tool for describing quorum status + */ +object MetadataQuorumCommand { + + def main(args: Array[String]): Unit = { +val res = mainNoExit(args) +Exit.exit(res) + } + + def mainNoExit(args: Array[String]): Int = { +val parser = ArgumentParsers.newArgumentParser("kafka-metadata-quorum") + .defaultHelp(true) + .description("This tool describes kraft metadata quorum status.") +parser.addArgument("--bootstrap-server") + .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.") + .required(true) + +parser.addArgument("--command-config") + .`type`(fileType()) + .help("Property file containing configs to be passed to Admin Client.") +val subparsers = parser.addSubparsers().dest("command") +addDescribeParser(subparsers) + +try { + val namespace = parser.parseArgsOrFail(args) + val command = namespace.getString("command") + + val commandConfig = namespace.get[File]("command_config") + val props = if (commandConfig != null) { +if (!commandConfig.exists()) { + throw new TerseFailure(s"Properties file ${commandConfig.getPath} does not exists!") +} +Utils.loadProps(commandConfig.getPath) + } else { +new Properties() + } + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server")) + val admin = Admin.create(props) + + if (command == "describe") { +handleDescribe(admin) + } else { +// currently we only support describe + } + admin.close() + 0 +} catch { + case e: TerseFailure => +Console.err.println(e.getMessage) +1 +} + } + + def addDescribeParser(subparsers: Subparsers): Unit = { +subparsers.addParser("describe") + .help("Describe the metadata quorum info") + } + + def handleDescribe(admin: Admin): Unit = { Review Comment: The output seems a bit different from what was documented in KIP-595 (and KIP-836): - https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum#KIP595:ARaftProtocolfortheMetadataQuorum-ToolingSupport. - https://cwiki.apache.org/confluence/display/KAFKA/KIP-836:+Addition+of+Information+in+DescribeQuorumResponse+about+Voter+Lag Any reason to change it? ## core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala: ## @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import kafka.tools.TerseFailure +import kafka.utils.Exit +import net.sourceforge.argparse4j.ArgumentParsers +import
[GitHub] [kafka] jolshan commented on a diff in pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode
jolshan commented on code in PR #12487: URL: https://github.com/apache/kafka/pull/12487#discussion_r941543760 ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition, private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = { metadataCache match { // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are - // allowed to join the ISR. This does not apply to ZK mode. + // allowed to join the ISR. In ZK mode, we just ensure the broker is alive and not shutting down. Review Comment: Ok makes sense. Should I change the comment to reflect that this will not block shutting down brokers here, but will be blocked controller side? I think for at least this PR (which we want to get into 3.3) we should hold off on protocol changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode
jolshan commented on code in PR #12487: URL: https://github.com/apache/kafka/pull/12487#discussion_r941537699 ## core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala: ## @@ -1045,6 +1049,78 @@ class ControllerIntegrationTest extends QuorumTestHarness { assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS)) } + @ParameterizedTest + @MethodSource(Array("testAlterPartitionVersionSource")) Review Comment: thanks! I was trying to remember how to do this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14144) AlterPartition is not idempotent when requests time out
[ https://issues.apache.org/jira/browse/KAFKA-14144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-14144. - Resolution: Fixed > AlterPartition is not idempotent when requests time out > --- > > Key: KAFKA-14144 > URL: https://issues.apache.org/jira/browse/KAFKA-14144 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.3.0 >Reporter: David Mao >Assignee: David Mao >Priority: Blocker > Fix For: 3.3.0 > > > [https://github.com/apache/kafka/pull/12032] changed the validation order of > AlterPartition requests to fence requests with a stale partition epoch before > we compare the leader and ISR contents. > This results in a loss of idempotency if a leader does not receive an > AlterPartition response because retries will receive an > INVALID_UPDATE_VERSION error. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji merged pull request #12489: KAFKA-14144: Compare AlterPartition LeaderAndIsr before fencing partition epoch
hachikuji merged PR #12489: URL: https://github.com/apache/kafka/pull/12489 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14134) Replace EasyMock with Mockito for WorkerConnectorTest
[ https://issues.apache.org/jira/browse/KAFKA-14134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison updated KAFKA-14134: --- Fix Version/s: 3.4.0 > Replace EasyMock with Mockito for WorkerConnectorTest > - > > Key: KAFKA-14134 > URL: https://issues.apache.org/jira/browse/KAFKA-14134 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Yash Mayya >Priority: Minor > Fix For: 3.4.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] yashmayya commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors
yashmayya commented on code in PR #12450: URL: https://github.com/apache/kafka/pull/12450#discussion_r941408595 ## docs/connect.html: ## @@ -440,19 +443,17 @@ Connector @Override public ListMapString, String taskConfigs(int maxTasks) { +// This method is where connectors provide the task configs for the tasks that are to be created for this connector. +// The length of the list determines the number of tasks that need to be created. The FileStreamSourceConnector, however, is +// only capable of spinning up a single task (since there isn't work that can be distributed among multiple tasks). +// Note that the task configs could contain configs additional to or different from the connector configs if needed (for instance, +// if different tasks have different responsibilities, or if different tasks are meant to process different subsets of the source data stream). Review Comment: Ah, fair point. I've trimmed it down to couple of sentences. ## docs/connect.html: ## @@ -440,19 +443,17 @@ Connector @Override public ListMapString, String taskConfigs(int maxTasks) { +// This method is where connectors provide the task configs for the tasks that are to be created for this connector. +// The length of the list determines the number of tasks that need to be created. The FileStreamSourceConnector, however, is +// only capable of spinning up a single task (since there isn't work that can be distributed among multiple tasks). +// Note that the task configs could contain configs additional to or different from the connector configs if needed (for instance, +// if different tasks have different responsibilities, or if different tasks are meant to process different subsets of the source data stream). Review Comment: Ah, fair point. I've trimmed it down to a couple of sentences. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12490: KAFKA-14147: Prevent deferredTaskUpdates map from growing monotonically in KafkaConfigBackingStore
C0urante commented on code in PR #12490: URL: https://github.com/apache/kafka/pull/12490#discussion_r941405027 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -853,6 +853,9 @@ private void processConnectorConfigRecord(String connectorName, SchemaAndValue v connectorConfigs.remove(connectorName); connectorTaskCounts.remove(connectorName); taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName)); +deferredTaskUpdates.remove(connectorName); +connectorTaskCountRecords.remove(connectorName); Review Comment: > Tbh, it doesn't really seem like it's worth the mess of null handling everywhere. I'm gonna back out this change and make this a single line PR A single line PR... with tests? Your understanding is correct--we only track task generations in memory, different herders may have different generations for the same set of task configs, and we use generations to abort task startup after initializing their transactional producer and to abort persisting zombie fencing records to the config topic. The reason this is all fine is that we don't really need to track an exact generation number; all we have to do is track whether a new set of task configs for a given connector has appeared after a specific set of task configs. Compaction should not change the fact that, once we have a generation number for a set of task configs, generation numbers for later task configs will be greater than that number. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #12472: KAFKA-14134: Replace EasyMock with Mockito for WorkerConnectorTest
C0urante merged PR #12472: URL: https://github.com/apache/kafka/pull/12472 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12472: KAFKA-14134: Replace EasyMock with Mockito for WorkerConnectorTest
C0urante commented on code in PR #12472: URL: https://github.com/apache/kafka/pull/12472#discussion_r941388856 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java: ## @@ -120,43 +105,21 @@ public void testInitializeFailure() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); -verifyAll(); +verifyCleanInitialize(); +verify(listener).onFailure(CONNECTOR, exception); +verify(listener).onShutdown(CONNECTOR); Review Comment: Yeah, that's fair. LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12450: KAFKA-13809: Propagate full connector configuration to tasks in FileStream connectors
C0urante commented on code in PR #12450: URL: https://github.com/apache/kafka/pull/12450#discussion_r941371749 ## connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java: ## @@ -20,36 +20,35 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; /** - * Very simple connector that works with the console. This connector supports both source and - * sink modes via its 'mode' setting. + * Very simple source connector that works with stdin or a file. */ public class FileStreamSourceConnector extends SourceConnector { + +private static final Logger log = LoggerFactory.getLogger(FileStreamSourceConnector.class); public static final String TOPIC_CONFIG = "topic"; public static final String FILE_CONFIG = "file"; public static final String TASK_BATCH_SIZE_CONFIG = "batch.size"; public static final int DEFAULT_TASK_BATCH_SIZE = 2000; -private static final ConfigDef CONFIG_DEF = new ConfigDef() +static final ConfigDef CONFIG_DEF = new ConfigDef() .define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. If not specified, the standard input will be used") -.define(TOPIC_CONFIG, Type.LIST, Importance.HIGH, "The topic to publish data to") +.define(TOPIC_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new ConfigDef.NonEmptyString(), Importance.HIGH, "The topic to publish data to") .define(TASK_BATCH_SIZE_CONFIG, Type.INT, DEFAULT_TASK_BATCH_SIZE, Importance.LOW, "The maximum number of records the Source task can read from file one time"); Review Comment: ```suggestion "The maximum number of records the source task can read from the file each time it is polled"); ``` ## docs/connect.html: ## @@ -423,9 +422,13 @@ Connector @Override public void start(MapString, String props) { -// The complete version includes error handling as well. -filename = props.get(FILE_CONFIG); -topic = props.get(TOPIC_CONFIG); +// All initialization logic and setting up of resources goes in this method. The FileStreamSourceConnector, however, doesn't need such logic here. Review Comment: ```suggestion // Initialization logic and setting up resources can take place in this method. This connector doesn't need to do any of that, but we do log a helpful message to the user. ``` ## docs/connect.html: ## @@ -440,19 +443,17 @@ Connector @Override public ListMapString, String taskConfigs(int maxTasks) { +// This method is where connectors provide the task configs for the tasks that are to be created for this connector. +// The length of the list determines the number of tasks that need to be created. The FileStreamSourceConnector, however, is +// only capable of spinning up a single task (since there isn't work that can be distributed among multiple tasks). +// Note that the task configs could contain configs additional to or different from the connector configs if needed (for instance, +// if different tasks have different responsibilities, or if different tasks are meant to process different subsets of the source data stream). Review Comment: This is really verbose. Can we simplify? I was hoping we'd be able to spell things out here in 1-2 lines. Keep in mind that the next paragraph provides a lot of useful info already: > Even with multiple tasks, this method implementation is usually pretty simple. It just has to determine the number of input tasks, which may require contacting the remote service it is pulling data from, and then divvy them up. Because some patterns for splitting work among tasks are so common, some utilities are provided in ConnectorUtils to simplify these cases. ## docs/connect.html: ## @@ -609,9 +618,11 @@ Connect Configuration Valida The following code in FileStreamSourceConnector defines the configuration and exposes it to the framework. -private static final ConfigDef CONFIG_DEF = new ConfigDef() -.define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.") -.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to"); +static final ConfigDef CONFIG_DEF = new ConfigDef() +.define(FILE_CONFIG, Type.STRING, null, Importance.HIGH, "Source filename. If not specified, the standard input will be used") +.define(TOPIC_CONFIG, Type.STRING,
[GitHub] [kafka] mimaison commented on pull request #12494: MINOR: Update site docs for ASF compliance
mimaison commented on PR #12494: URL: https://github.com/apache/kafka/pull/12494#issuecomment-1209402011 Thanks @bbejeck ! Should we fix the issues I reported in https://github.com/apache/kafka-site/pull/433#issuecomment-1205049698 before merging the images back to Kafka? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #12494: MINOR: Update site docs for ASF compliance
bbejeck commented on PR #12494: URL: https://github.com/apache/kafka/pull/12494#issuecomment-1209389661 @mimaison added images -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14014) Flaky test NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-14014?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17573211#comment-17573211 ] Matthew de Detrich edited comment on KAFKA-14014 at 8/9/22 12:58 PM: - In case people want to reproduce the flakiness, assuming you have a working docker installation you can do the following {code:java} docker run -it --cpus=2 --rm -u gradle -v "$PWD":/home/gradle/project -w /home/gradle/project gradle sh{code} where cpus=2 is how you can toggle how many cpus you want (there is a tradeoff between higher occurrence to encounter the flakiness vs how fast the test runs). I wouldn't recommend doing lower than cpus=2 otherwise even building the kafak project in gradle can take ages. The above command will put you into a shell at which point you can do {code:java} while [ $? -eq 0 ]; do ./gradlew :streams:test --tests org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets; done{code} Which will re-run the tests until there is a failure. Note that due to how Gradle cache's test runs, you need to do something like [https://github.com/gradle/gradle/issues/9151#issue-434212465] or put {code:java} test.outputs.upToDateWhen {false}{code} in order to force gradle to re-run the test every time. was (Author: mdedetrich-aiven): In case people want to reproduce the flakiness, assuming you have a working docker installation you can do the following {code:java} docker run -it --cpus=2 --rm -u gradle -v "$PWD":/home/gradle/project -w /home/gradle/project gradle sh{code} where cpus=2 is how you can toggle how many cpus you want (there is a tradeoff between higher occurrence to encounter the flakiness vs how fast the test runs). I wouldn't recommend doing lower than cpus=2 otherwise even building the kafak project in gradle can take ages. The above command will put you into a shell at which point you can do {code:java} while [ $? -eq 0 ]; do ./gradlew :streams:test --tests org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets; done{code} Which will re-run the tests until there is a failure. Note that due to how Gradle cache's test runs, you need to do something like [https://github.com/gradle/gradle/issues/9151#issue-434212465] in order to force gradle to re-run the test every time. > Flaky test > NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() > > > Key: KAFKA-14014 > URL: https://issues.apache.org/jira/browse/KAFKA-14014 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bruno Cadonna >Assignee: Matthew de Detrich >Priority: Critical > Labels: flaky-test > > {code:java} > java.lang.AssertionError: > Expected: <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 2)]> > but: was <[KeyValue(B, 1), KeyValue(A, 2), KeyValue(C, 1)]> > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) > at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) > at > org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets(NamedTopologyIntegrationTest.java:540) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:568) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:299) > at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:293) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) >
[GitHub] [kafka] bbejeck commented on pull request #12494: MINOR: Update site docs for ASF compliance
bbejeck commented on PR #12494: URL: https://github.com/apache/kafka/pull/12494#issuecomment-1209340844 > The path of the images is `/{{version}}/images/`, so do we need to add them to this repo too? Yes I overlooked that - will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #12485: KAFKA-14131: Adding InterruptException when reading to end of Offseto…
vamossagar12 commented on PR #12485: URL: https://github.com/apache/kafka/pull/12485#issuecomment-1209328983 > This should definitely come with a test :) > > I'm also not sure this is the best approach, since the [ExecutorService::shutdownNow Javadocs](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ExecutorService.html#shutdownNow()) don't give us any guarantees about threads being interrupted: > > > There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. For example, typical implementations will cancel via [Thread.interrupt()](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/Thread.html#interrupt()), so any task that fails to respond to interrupts may never terminate. > > And on top of that, by the time the thread is interrupted--if it's interrupted at all--we've already exhausted our graceful shutdown timeout. > > Can we leverage the existing shutdown logic in `KafkaBasedLog::stop` somehow, possibly by accounting for a `WakeupException` being thrown while reading to the end of the log during `KafkaBasedLog::start`? I'm not certain that it's safe to stop a log while another thread is in the middle of starting the log; we may have to tweak some of the logic there. We may also have to wake up/interrupt/shut down the admin client (if we're using one to read offsets) since that too could potentially block (but perhaps not indefinitely). Thanks Chris. I agree do need a test. Would figure out how to add one.. I think you brought up a great point. Here's what I understood as per the issue. If a DistributedHerder gets a signal to be shutdown in `stop` method, it invokes `shutdown` and waits for `graceful shutdown timeout`. `awaitTermination` can throw an `InterruptedException` and once that's thrown, the KafkaBasedLog remains in an infinite loop since I believe the initial readToEnd is off the same thread(switched to a `WorkerThread` later on. That's the interrupted exception that I was looking to handle in this PR. Do you think that makes sense? IIUC, the scenario you have described is if even after the graceful shutdown, if the Log isn't stopped and the herder never got to get interrupted, the potential infinite loop issue still remains. Is that correct? I think using `KafkaBasedLog::stop` might be a good idea in this case. BTW, I see there's something called `stopServices` which is effectively stopping these the backing consumers. That inherently calls the `KafkaBasedLog::stop` method and it's all wired using `halt` method which checks upon a `stopping` flag which is set in `stop` method (BTW, I know you know all this, just writing down for my own confirmation :D ) . Do you think the situation you described would still arise or the only unhandled case was the interruption of the herder executor. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request, #12497: KAFKA-10199: Expose read only task from state updater
cadonna opened a new pull request, #12497: URL: https://github.com/apache/kafka/pull/12497 The state updater exposes tasks that are in restoration to the stream thread. To ensure that the stream thread only accesses the tasks to read from the tasks without modifying any internal state, this PR introduces a read-only task that throws an exception if the caller tries to modify the internal state of a task. This PR also returns read-only tasks from DefaultStateUpdater#getTasks(). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-14133: -- Description: {color:#DE350B}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#FF8B00}In Review{color} # WorkerConnectorTest (connect) (owner: Yash) # WorkerCoordinatorTest (connect) # RootResourceTest (connect) # ByteArrayProducerRecordEquals (connect) # TopologyTest # {color:#FF8B00}KStreamFlatTransformTest{color} (owner: Christo) # {color:#FF8B00}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#FF8B00}KStreamPrintTest{color} (owner: Christo) # {color:#FF8B00}KStreamRepartitionTest{color} (owner: Christo) # {color:#FF8B00}MaterializedInternalTest{color} (owner: Christo) # {color:#FF8B00}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#FF8B00}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#FF8B00}ClientUtilsTest{color} (owner: Christo) # {color:#FF8B00}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # KTableSuppressProcessorTest # InternalTopicManagerTest # ProcessorContextImplTest # ProcessorStateManagerTest # StandbyTaskTest # StoreChangelogReaderTest # StreamTaskTest # StreamThreadTest # StreamsAssignmentScaleTest (*WIP* owner: Christo) # StreamsPartitionAssignorTest (*WIP* owner: Christo) # StreamsRebalanceListenerTest # TaskManagerTest # TimestampedKeyValueStoreMaterializerTest # WriteConsistencyVectorTest # AssignmentTestUtils (*WIP* owner: Christo) # StreamsMetricsImplTest # CachingInMemoryKeyValueStoreTest # CachingInMemorySessionStoreTest # CachingPersistentSessionStoreTest # CachingPersistentWindowStoreTest # ChangeLoggingKeyValueBytesStoreTest # ChangeLoggingSessionBytesStoreTest # ChangeLoggingTimestampedKeyValueBytesStoreTest # ChangeLoggingTimestampedWindowBytesStoreTest # ChangeLoggingWindowBytesStoreTest # CompositeReadOnlyWindowStoreTest # KeyValueStoreBuilderTest # MeteredTimestampedWindowStoreTest # RocksDBStoreTest # StreamThreadStateStoreProviderTest # TimeOrderedCachingPersistentWindowStoreTest # TimeOrderedWindowStoreTest *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#DE350B}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: # WorkerConnectorTest (connect) (owner: Yash) # WorkerCoordinatorTest (connect) # RootResourceTest (connect) # ByteArrayProducerRecordEquals (connect) # TopologyTest # {color:#FF8B00}KStreamFlatTransformTest{color} (owner: Christo) # {color:#FF8B00}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#FF8B00}KStreamPrintTest{color} (owner: Christo) # {color:#FF8B00}KStreamRepartitionTest{color} (owner: Christo) # {color:#FF8B00}MaterializedInternalTest{color} (owner: Christo) # {color:#FF8B00}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#FF8B00}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#FF8B00}ClientUtilsTest{color} (owner: Christo) # {color:#FF8B00}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # KTableSuppressProcessorTest # InternalTopicManagerTest # ProcessorContextImplTest # ProcessorStateManagerTest # StandbyTaskTest # StoreChangelogReaderTest # StreamTaskTest # StreamThreadTest # StreamsAssignmentScaleTest (*WIP* owner: Christo) # StreamsPartitionAssignorTest (*WIP* owner: Christo) # StreamsRebalanceListenerTest # TaskManagerTest # TimestampedKeyValueStoreMaterializerTest # WriteConsistencyVectorTest # AssignmentTestUtils (*WIP* owner: Christo) # StreamsMetricsImplTest # CachingInMemoryKeyValueStoreTest # CachingInMemorySessionStoreTest # CachingPersistentSessionStoreTest # CachingPersistentWindowStoreTest # ChangeLoggingKeyValueBytesStoreTest # ChangeLoggingSessionBytesStoreTest # ChangeLoggingTimestampedKeyValueBytesStoreTest # ChangeLoggingTimestampedWindowBytesStoreTest # ChangeLoggingWindowBytesStoreTest # CompositeReadOnlyWindowStoreTest # KeyValueStoreBuilderTest # MeteredTimestampedWindowStoreTest # RocksDBStoreTest # StreamThreadStateStoreProviderTest # TimeOrderedCachingPersistentWindowStoreTest # TimeOrderedWindowStoreTest *The coverage report for the above tests
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-14133: -- Description: {color:#DE350B}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: # WorkerConnectorTest (connect) (owner: Yash) # WorkerCoordinatorTest (connect) # RootResourceTest (connect) # ByteArrayProducerRecordEquals (connect) # TopologyTest # {color:#FF8B00}KStreamFlatTransformTest{color} (owner: Christo) # {color:#FF8B00}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#FF8B00}KStreamPrintTest{color} (owner: Christo) # {color:#FF8B00}KStreamRepartitionTest{color} (owner: Christo) # {color:#FF8B00}MaterializedInternalTest{color} (owner: Christo) # {color:#FF8B00}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#FF8B00}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#FF8B00}ClientUtilsTest{color} (owner: Christo) # {color:#FF8B00}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # KTableSuppressProcessorTest # InternalTopicManagerTest # ProcessorContextImplTest # ProcessorStateManagerTest # StandbyTaskTest # StoreChangelogReaderTest # StreamTaskTest # StreamThreadTest # StreamsAssignmentScaleTest (*WIP* owner: Christo) # StreamsPartitionAssignorTest (*WIP* owner: Christo) # StreamsRebalanceListenerTest # TaskManagerTest # TimestampedKeyValueStoreMaterializerTest # WriteConsistencyVectorTest # AssignmentTestUtils (*WIP* owner: Christo) # StreamsMetricsImplTest # CachingInMemoryKeyValueStoreTest # CachingInMemorySessionStoreTest # CachingPersistentSessionStoreTest # CachingPersistentWindowStoreTest # ChangeLoggingKeyValueBytesStoreTest # ChangeLoggingSessionBytesStoreTest # ChangeLoggingTimestampedKeyValueBytesStoreTest # ChangeLoggingTimestampedWindowBytesStoreTest # ChangeLoggingWindowBytesStoreTest # CompositeReadOnlyWindowStoreTest # KeyValueStoreBuilderTest # MeteredTimestampedWindowStoreTest # RocksDBStoreTest # StreamThreadStateStoreProviderTest # TimeOrderedCachingPersistentWindowStoreTest # TimeOrderedWindowStoreTest *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#DE350B}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: # WorkerConnectorTest (connect) (owner: Yash) # WorkerCoordinatorTest (connect) # RootResourceTest (connect) # ByteArrayProducerRecordEquals (connect) # TopologyTest # KStreamFlatTransformTest (owner: Christo) # KStreamFlatTransformValuesTest (owner: Christo) # KStreamPrintTest (owner: Christo) # KStreamRepartitionTest (owner: Christo) # MaterializedInternalTest (owner: Christo) # TransformerSupplierAdapterTest (owner: Christo) # KTableSuppressProcessorMetricsTest (owner: Christo) # KTableSuppressProcessorTest # ClientUtilsTest (owner: Christo) # HighAvailabilityStreamsPartitionAssignorTest (owner: Christo) # InternalTopicManagerTest # ProcessorContextImplTest # ProcessorStateManagerTest # StandbyTaskTest # StoreChangelogReaderTest # StreamTaskTest # StreamThreadTest # StreamsAssignmentScaleTest (*WIP* owner: Christo) # StreamsPartitionAssignorTest (*WIP* owner: Christo) # StreamsRebalanceListenerTest # TaskManagerTest # TimestampedKeyValueStoreMaterializerTest # WriteConsistencyVectorTest # AssignmentTestUtils (*WIP* owner: Christo) # StreamsMetricsImplTest # CachingInMemoryKeyValueStoreTest # CachingInMemorySessionStoreTest # CachingPersistentSessionStoreTest # CachingPersistentWindowStoreTest # ChangeLoggingKeyValueBytesStoreTest # ChangeLoggingSessionBytesStoreTest # ChangeLoggingTimestampedKeyValueBytesStoreTest # ChangeLoggingTimestampedWindowBytesStoreTest # ChangeLoggingWindowBytesStoreTest # CompositeReadOnlyWindowStoreTest # KeyValueStoreBuilderTest # MeteredTimestampedWindowStoreTest # RocksDBStoreTest # StreamThreadStateStoreProviderTest # TimeOrderedCachingPersistentWindowStoreTest # TimeOrderedWindowStoreTest *The coverage report for the above tests after the change should be >= to what the coverage is now.* > Remaining EasyMock to Mockito tests > --- > > Key: KAFKA-14133 > URL:
[jira] [Created] (KAFKA-14150) Allocation of initial partitions is deterministic and produces a leader bias when a broker is offline
David Buckley created KAFKA-14150: - Summary: Allocation of initial partitions is deterministic and produces a leader bias when a broker is offline Key: KAFKA-14150 URL: https://issues.apache.org/jira/browse/KAFKA-14150 Project: Kafka Issue Type: Improvement Reporter: David Buckley Observation of our current cluster suggests that with N brokers, the first N partitions are always allocated in a round-robin format with a random offset. The preferred leader is always the first in a given replica list (and hence is allocated round-robin, too). Subsequent brokers are allocated using some shuffle on the list, again in a round-robin, which I think is fine and doesn't show the bias I detail below. Suppose every topic has as many partitions as there are brokers and replication factor of 3. Then every topic has replicas {{N, N+1, N+2}} except where this wraps. Example: * Topic A: 3 partitions, replicas {{012}}, {{120}}, {{201}}, leaders 0, 1, 2 * Topic B: 3 partitions, replicas {{120}}, {{201}}, {{012}}, leaders 1, 2, 0 * Topic C: 3 partitions, replicas {{201}}, {{012}}, {{120}}, leaders 2, 0, 1 This means that if broker {{x}} goes down, every partition that had {{x}} as its preferred leader now elects {{x+1}} as its leader -- the leader allocation were broker 1 to be offline now looks like: * Topic A: 3 partitions, replicas {{02}}, {{20}}, {{20}}, leaders 0, 2, 2 * Topic B: 3 partitions, replicas {{20}}, {{20}}, {{02}}, leaders 2, 2, 0 * Topic C: 3 partitions, replicas {{20}}, {{02}}, {{20}}, leaders 2, 0, 2 We see that broker 2 becomes leader of 100% of the failed-over partitions, and is now leader of 2x as many partitions as broker 0. If there were 6 brokers, we'd see that replica sets {{02}}, {{23}} and {{50}} would have reduced replication (and broker 4 isn't providing any redundancy for partitions replicated in broker 1) in addition to broker 2 leading 2x as many partitions as any other broker. Brokers 0 and 2 are now more critical than 3 and 5, which are in turn more critical than broker 4. I'm unclear if there's any undesirable side-effects of this, but my expectation is that the behaviour isn't really intended because subsequent partitions don't just replicate the round-robin of the first batch. Should the allocation of the initial partitions be completely random to avoid this bias, or is it inconsequential? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] yashmayya commented on a diff in pull request #12472: KAFKA-14134: Replace EasyMock with Mockito for WorkerConnectorTest
yashmayya commented on code in PR #12472: URL: https://github.com/apache/kafka/pull/12472#discussion_r941154092 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java: ## @@ -672,6 +452,27 @@ protected void assertInitializedMetric(WorkerConnector workerConnector, String e assertEquals(VERSION, version); } +@SuppressWarnings("unchecked") +private Callback mockCallback() { +return mock(Callback.class); +} + +private void verifyCleanInitialize() { +verify(connector).version(); +if (connector instanceof SourceConnector) { +verify(offsetStore).start(); +verify(connector).initialize(any(SourceConnectorContext.class)); +} else { +verify(connector).initialize(any(SinkConnectorContext.class)); +} +} + +private void verifyCleanShutdown() { +verify(ctx).close(); +verify(offsetStorageReader).close(); Review Comment: Good catch, thanks! I just translated the tests as is and hadn't noticed that we were passing non-null offset storage reader and offset backing stores instances even for sink connectors in these tests (differing in behavior with how the actual `Worker` instantiates `WorkerConnector`s). ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java: ## @@ -120,43 +105,21 @@ public void testInitializeFailure() { workerConnector.doShutdown(); assertStoppedMetric(workerConnector); -verifyAll(); +verifyCleanInitialize(); +verify(listener).onFailure(CONNECTOR, exception); +verify(listener).onShutdown(CONNECTOR); Review Comment: The one for shutdown makes sense but I think with startup there are three different cases - i) No call to `connector.start` or `listener.onStartup` (failure in initialization OR started in the paused state) ii) Calls to both `connector.start` and `listener.onStartup` (successful start) iii) Call to only `connector.start` (connector started in the paused state and then resumed OR failure on startup) I think it might be more readable to keep these verify calls in the test methods as is rather than trying to force fit them into (in)appropriately named methods, WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #12414: KAFKA-14073 Logging the reason for Snapshot
dengziming commented on code in PR #12414: URL: https://github.com/apache/kafka/pull/12414#discussion_r941141588 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala: ## @@ -59,11 +60,19 @@ class BrokerMetadataSnapshotter( val writer = writerBuilder.build( image.highestOffsetAndEpoch().offset, image.highestOffsetAndEpoch().epoch, -lastContainedLogTime - ) +lastContainedLogTime) if (writer.nonEmpty) { _currentSnapshotOffset = image.highestOffsetAndEpoch().offset -info(s"Creating a new snapshot at offset ${_currentSnapshotOffset}...") + +var stringReasons: Set[String] = Set() + +snapshotReasons.foreach(r => stringReasons += r.toString) + +if (stringReasons.isEmpty){ + stringReasons += SnapshotReason.UnknownReason.toString +} + +info(s"Creating a new snapshot at offset ${_currentSnapshotOffset} because, ${stringReasons.mkString(" and ")}") Review Comment: We directly use snapshotReasons.mkString(" and ") here and remove `stringReasons` and related code. ## raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java: ## @@ -107,6 +107,8 @@ public synchronized void handleCommit(BatchReader reader) { } log.debug("Counter incremented from {} to {}", initialCommitted, committed); +// A snapshot is being taken here too, not being able to - +// `import org.apache.kafka.metadata.utils.SnapshotReason`, figure out why? Review Comment: The `raft` module only relies on the `clients` module so it can't import a class from the `metadata` module. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #12494: MINOR: Update site docs for ASF compliance
mimaison commented on PR #12494: URL: https://github.com/apache/kafka/pull/12494#issuecomment-1209131631 The path of the images is `/{{version}}/images/`, so do we need to add them to this repo too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12487: KAFKA-14140: Ensure an offline or in-controlled-shutdown replica is not eligible to join ISR in ZK mode
dajac commented on code in PR #12487: URL: https://github.com/apache/kafka/pull/12487#discussion_r941077938 ## core/src/test/scala/unit/kafka/cluster/PartitionTest.scala: ## @@ -1375,8 +1377,11 @@ class PartitionTest extends AbstractPartitionTest { assertEquals(alterPartitionListener.failures.get, 1) } - @Test - def testIsrNotExpandedIfReplicaIsFenced(): Unit = { + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testIsrNotExpandedIfReplicaIsFenced(quorum: String): Unit = { Review Comment: nit: Should we update the test name as well? ## core/src/main/scala/kafka/cluster/Partition.scala: ## @@ -881,11 +881,14 @@ class Partition(val topicPartition: TopicPartition, private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = { metadataCache match { // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are - // allowed to join the ISR. This does not apply to ZK mode. + // allowed to join the ISR. In ZK mode, we just ensure the broker is alive and not shutting down. Review Comment: This basically means that the leader will retry adding back the shutting-down broker to the ISR until the shutting-down broker is removed from the metadata cache. It is worth noting that, during this time, other replicas cannot be added back to the ISR. The controller rejects any ISR expansion containing at least one ineligible replica. This is why we added that in-controller-shutdown state in KRaft. It allows the leader to filter them out as soon. This may be acceptable here. Otherwise, we would have to propagate the shutting-down brokers via the UpdateMetadataRequest. What do others think? ## core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala: ## @@ -1045,6 +1049,78 @@ class ControllerIntegrationTest extends QuorumTestHarness { assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS)) } + @ParameterizedTest + @MethodSource(Array("testAlterPartitionVersionSource")) + def testShutdownBrokerNotAddedToIsr(alterPartitionVersion: Short): Unit = { +servers = makeServers(2) +val controllerId = TestUtils.waitUntilControllerElected(zkClient) +val otherBroker = servers.find(_.config.brokerId != controllerId).get +val brokerId = otherBroker.config.brokerId +val tp = new TopicPartition("t", 0) +val assignment = Map(tp.partition -> Seq(controllerId, brokerId)) +val fullIsr = List(controllerId, brokerId) +TestUtils.createTopic(zkClient, tp.topic, partitionReplicaAssignment = assignment, servers = servers) + +// Shut down follower. +servers(brokerId).shutdown() +servers(brokerId).awaitShutdown() + +val controller = getController().kafkaController +val leaderIsrAndControllerEpochMap = controller.controllerContext.partitionsLeadershipInfo +val leaderAndIsr = leaderIsrAndControllerEpochMap(tp).leaderAndIsr +val topicId = controller.controllerContext.topicIds(tp.topic) +val controllerEpoch = controller.controllerContext.liveBrokerIdAndEpochs(controllerId) + +// We expect only the controller (online broker) to be in ISR +assertEquals(List(controllerId), leaderAndIsr.isr) + +val requestTopic = new AlterPartitionRequestData.TopicData() + .setPartitions(Seq(new AlterPartitionRequestData.PartitionData() +.setPartitionIndex(tp.partition) +.setLeaderEpoch(leaderAndIsr.leaderEpoch) +.setPartitionEpoch(leaderAndIsr.partitionEpoch) +.setNewIsr(fullIsr.map(Int.box).asJava) + .setLeaderRecoveryState(leaderAndIsr.leaderRecoveryState.value)).asJava) +if (alterPartitionVersion > 1) requestTopic.setTopicId(topicId) else requestTopic.setTopicName(tp.topic) + +// Try to update ISR to contain the offline broker. +val alterPartitionRequest = new AlterPartitionRequestData() + .setBrokerId(controllerId) + .setBrokerEpoch(controllerEpoch) + .setTopics(Seq(requestTopic).asJava) + +val future = new CompletableFuture[AlterPartitionResponseData]() +controller.eventManager.put(AlterPartitionReceived( + alterPartitionRequest, + alterPartitionVersion, + future.complete +)) Review Comment: nit: This piece of code is used in multiple places now. I wonder if it is worth pulling it in a helper method. What do you think? ## core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala: ## @@ -1045,6 +1049,78 @@ class ControllerIntegrationTest extends QuorumTestHarness { assertEquals(expectedAlterPartitionResponse, future.get(10, TimeUnit.SECONDS)) } + @ParameterizedTest + @MethodSource(Array("testAlterPartitionVersionSource")) + def testShutdownBrokerNotAddedToIsr(alterPartitionVersion: Short): Unit = { +servers = makeServers(2) +val controllerId = TestUtils.waitUntilControllerElected(zkClient) +val otherBroker
[GitHub] [kafka] yashmayya commented on a diff in pull request #12490: KAFKA-14147: Prevent maps from growing monotonically in KafkaConfigBackingStore
yashmayya commented on code in PR #12490: URL: https://github.com/apache/kafka/pull/12490#discussion_r940928631 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -853,6 +853,9 @@ private void processConnectorConfigRecord(String connectorName, SchemaAndValue v connectorConfigs.remove(connectorName); connectorTaskCounts.remove(connectorName); taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName)); +deferredTaskUpdates.remove(connectorName); +connectorTaskCountRecords.remove(connectorName); Review Comment: Tbh, it doesn't really seem like it's worth the mess of null handling everywhere. I'm gonna back out this change and make this a single line PR Btw, unrelated to this PR - but I noticed that the task config generations aren't persisted to the config topic, they're simply maintained in the in-memory map. So, based on worker restarts, different workers (herders to be precise) could have different views on the task config generations for the same connector because the config topic is compacted and older task commit records could be lost (the task config generation is incremented by 1 each time a task commit record is encountered). From what I can tell, task config generations are used in two places primarily - i) We compare the task config generation from prior to a EOS task startup with the task config generation after the task has initialized its transactional producer (and fail it if there's a mismatch indicating a new set of tasks have been brought up) ii) At the end of a zombie fencing, if the task config generation is greater than the task config generation at the beginning of the zombie fencing, a `409` is returned by the fencing endpoint because a new set of tasks were brought up in the meanwhile I'm struggling to think of any cases where this would cause an issue (i.e. different herders having different values for task config generations of the same connector) but I was hoping that you could verify my understanding here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org