[jira] [Created] (KAFKA-14424) Cancellation of an ongoing replica reassignment should have sanity checks
Lucas Wang created KAFKA-14424: -- Summary: Cancellation of an ongoing replica reassignment should have sanity checks Key: KAFKA-14424 URL: https://issues.apache.org/jira/browse/KAFKA-14424 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang When reassigning replicas, Kafka runs a sanity check to ensure all of the target replicas are alive before allowing the reassignment request to proceed. However, for an AlterPartitionReassignments request that cancels an ongoing reassignment, there is no such check. The result is that if the original replicas are offline, the cancellation may result in partitions without any leaders. This problem has been observed in our clusters. There should be some sanity check to ensure the cancellation would also land the partitions in valid states, e.g. by ensuring all of the original replicas are all alive. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14381) Support listing all partitions being reassigned in a cluster
[ https://issues.apache.org/jira/browse/KAFKA-14381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang reassigned KAFKA-14381: -- Assignee: Lucas Wang > Support listing all partitions being reassigned in a cluster > > > Key: KAFKA-14381 > URL: https://issues.apache.org/jira/browse/KAFKA-14381 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > > The current implementation of kafka-topics.sh doesn't support listing all of > the partitions that are being reassigned within a cluster. > Showing such info can be really useful during troubleshooting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14381) Support listing all partitions being reassigned in a cluster
Lucas Wang created KAFKA-14381: -- Summary: Support listing all partitions being reassigned in a cluster Key: KAFKA-14381 URL: https://issues.apache.org/jira/browse/KAFKA-14381 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang The current implementation of kafka-topics.sh doesn't support listing all of the partitions that are being reassigned within a cluster. Showing such info can be really useful during troubleshooting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14213) Reduce lock contention between control-plane-kafka-request-handler and kafka-log-cleaner-thread
[ https://issues.apache.org/jira/browse/KAFKA-14213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-14213: --- Description: We found that the StopReplica request's local processing time may be quite long due to the reasons explained below. The impact of a time-consuming StopReplica request is that all subsequent requests from the controller are blocked, causing slow convergence on the metadata plane. The long local processing time is because the control-plane-kafka-request-handler thread is blocked on a lock to abort logCleaning with the following stack trace: {code:java} "control-plane-kafka-request-handler-0" java.lang.Thread.State: WAITING at java.base@11.0.13/jdk.internal.misc.Unsafe.park(Native Method) at java.base@11.0.13/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194) at java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885) at java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917) at java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240) at java.base@11.0.13/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267) at kafka.log.LogCleanerManager.abortCleaning(LogCleanerManager.scala:266) at kafka.log.LogCleaner.abortCleaning(LogCleaner.scala:205) at kafka.log.LogManager.asyncDelete(LogManager.scala:1039) {code} In the mean time, the kafka-log-cleaner-thread is holding the lock and busy figuring out the filthiest compacted log, which can be a very time consuming task. Our periodic thread dumps captured many snapshots with the kafka-log-cleaner-thread in either of the following 2 stack traces: {code:java} "kafka-log-cleaner-thread-0" java.lang.Thread.State: RUNNABLE at kafka.log.TimeIndex.(TimeIndex.scala:57) at kafka.log.LazyIndex$.$anonfun$forTime$1(LazyIndex.scala:109) at kafka.log.LazyIndex$$$Lambda$1871/0x000800f4d840.apply(Unknown Source) at kafka.log.LazyIndex.$anonfun$get$1(LazyIndex.scala:63) at kafka.log.LazyIndex.get(LazyIndex.scala:60) at kafka.log.LogSegment.timeIndex(LogSegment.scala:66) at kafka.log.LogSegment.maxTimestampAndOffsetSoFar(LogSegment.scala:107) at kafka.log.LogSegment.maxTimestampSoFar(LogSegment.scala:113) at kafka.log.LogSegment.largestTimestamp(LogSegment.scala:640) at kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4(LogCleanerManager.scala:617) at kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4$adapted(LogCleanerManager.scala:616) at kafka.log.LogCleanerManager$$$Lambda$2215/0x00080104d040.apply(Unknown Source) at scala.collection.Iterator.find(Iterator.scala:993) at scala.collection.Iterator.find$(Iterator.scala:990) at scala.collection.AbstractIterator.find(Iterator.scala:1429) at scala.collection.IterableLike.find(IterableLike.scala:81) at scala.collection.IterableLike.find$(IterableLike.scala:80) at scala.collection.AbstractIterable.find(Iterable.scala:56) at kafka.log.LogCleanerManager$.cleanableOffsets(LogCleanerManager.scala:616) at kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:186) at kafka.log.LogCleanerManager$$Lambda$2212/0x00080104f040.apply(Unknown Source) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.TraversableLike$$Lambda$891/0x000800a89840.apply(Unknown Source) at scala.collection.immutable.List.foreach(List.scala:392) {code} {code:java} "kafka-log-cleaner-thread-0" java.lang.Thread.State: RUNNABLE at java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread0(Native Method) at java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread(FileDispatcherImpl.java:54) at java.base@11.0.13/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:274) at java.base@11.0.13/sun.nio.ch.IOUtil.read(IOUtil.java:245) at java.base@11.0.13/sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:811) at java.base@11.0.13/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:796) at org.apache.kafka.common.utils.Utils.readFully(Utils.java:1114) at org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:1087) at org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:69) at org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:42) at org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35) at org.apache.kafka.common.record.Re
[jira] [Created] (KAFKA-14213) Reduce lock contention between control-plane-kafka-request-handler and kafka-log-cleaner-thread
Lucas Wang created KAFKA-14213: -- Summary: Reduce lock contention between control-plane-kafka-request-handler and kafka-log-cleaner-thread Key: KAFKA-14213 URL: https://issues.apache.org/jira/browse/KAFKA-14213 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang We found that the StopReplica request's local processing time may be quite long due to the reasons explained below. The impact of a time-consuming StopReplica request is that all subsequent requests from the controller are blocked, causing slow convergence on the metadata plane. The long local processing time is because the control-plane-kafka-request-handler thread is blocked on a lock to abort logCleaning with the following stack trace: {code:java} "control-plane-kafka-request-handler-0" java.lang.Thread.State: WAITING at java.base@11.0.13/jdk.internal.misc.Unsafe.park(Native Method) at java.base@11.0.13/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194) at java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885) at java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917) at java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240) at java.base@11.0.13/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267) at kafka.log.LogCleanerManager.abortCleaning(LogCleanerManager.scala:266) at kafka.log.LogCleaner.abortCleaning(LogCleaner.scala:205) at kafka.log.LogManager.asyncDelete(LogManager.scala:1039) {code} In the mean time, the kafka-log-cleaner-thread is holding the lock and busy figuring out the filthiest compacted log, which can be a very time consuming task. Our periodic thread dumps captured many snapshots with the kafka-log-cleaner-thread in either of the following 2 stack traces: {code:java} "kafka-log-cleaner-thread-0" java.lang.Thread.State: RUNNABLE at kafka.log.TimeIndex.(TimeIndex.scala:57) at kafka.log.LazyIndex$.$anonfun$forTime$1(LazyIndex.scala:109) at kafka.log.LazyIndex$$$Lambda$1871/0x000800f4d840.apply(Unknown Source) at kafka.log.LazyIndex.$anonfun$get$1(LazyIndex.scala:63) at kafka.log.LazyIndex.get(LazyIndex.scala:60) at kafka.log.LogSegment.timeIndex(LogSegment.scala:66) at kafka.log.LogSegment.maxTimestampAndOffsetSoFar(LogSegment.scala:107) at kafka.log.LogSegment.maxTimestampSoFar(LogSegment.scala:113) at kafka.log.LogSegment.largestTimestamp(LogSegment.scala:640) at kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4(LogCleanerManager.scala:617) at kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4$adapted(LogCleanerManager.scala:616) at kafka.log.LogCleanerManager$$$Lambda$2215/0x00080104d040.apply(Unknown Source) at scala.collection.Iterator.find(Iterator.scala:993) at scala.collection.Iterator.find$(Iterator.scala:990) at scala.collection.AbstractIterator.find(Iterator.scala:1429) at scala.collection.IterableLike.find(IterableLike.scala:81) at scala.collection.IterableLike.find$(IterableLike.scala:80) at scala.collection.AbstractIterable.find(Iterable.scala:56) at kafka.log.LogCleanerManager$.cleanableOffsets(LogCleanerManager.scala:616) at kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:186) at kafka.log.LogCleanerManager$$Lambda$2212/0x00080104f040.apply(Unknown Source) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at scala.collection.TraversableLike$$Lambda$891/0x000800a89840.apply(Unknown Source) at scala.collection.immutable.List.foreach(List.scala:392) {code} {code:java} "kafka-log-cleaner-thread-0" java.lang.Thread.State: RUNNABLE at java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread0(Native Method) at java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread(FileDispatcherImpl.java:54) at java.base@11.0.13/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:274) at java.base@11.0.13/sun.nio.ch.IOUtil.read(IOUtil.java:245) at java.base@11.0.13/sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:811) at java.base@11.0.13/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:796) at org.apache.kafka.common.utils.Utils.readFully(Utils.java:1114) at org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:1087) at org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:69) at org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:42) at org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35) at org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24) at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79) at o
[jira] [Assigned] (KAFKA-13815) Avoid reinitialization for a replica that is being deleted
[ https://issues.apache.org/jira/browse/KAFKA-13815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang reassigned KAFKA-13815: -- Assignee: Lucas Wang > Avoid reinitialization for a replica that is being deleted > -- > > Key: KAFKA-13815 > URL: https://issues.apache.org/jira/browse/KAFKA-13815 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Major > > https://issues.apache.org/jira/browse/KAFKA-10002 > identified that deletion of replicas can be slow when a StopReplica request > is being > processed, and has implemented a change to improve the efficiency. > We found that the efficiency can be further improved by avoiding the > reinitialization of the > leader epoch cache and partition metadata for a replica that needs to be > deleted. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13815) Avoid reinitialization for a replica that is being deleted
Lucas Wang created KAFKA-13815: -- Summary: Avoid reinitialization for a replica that is being deleted Key: KAFKA-13815 URL: https://issues.apache.org/jira/browse/KAFKA-13815 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang https://issues.apache.org/jira/browse/KAFKA-10002 identified that deletion of replicas can be slow when a StopReplica request is being processed, and has implemented a change to improve the efficiency. We found that the efficiency can be further improved by avoiding the reinitialization of the leader epoch cache and partition metadata for a replica that needs to be deleted. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13797) Adding metric to indicate metadata response outgoing bytes rate
[ https://issues.apache.org/jira/browse/KAFKA-13797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-13797: --- Description: It's not a common case, but we experienced the following problem in one of our clusters. The use case involves dynamically creating and deleting topics in the cluster, and the clients were constantly using the special type of Metadata requests whose topics field is null in order to retrieve all topics before checking a topic's existence. A high rate of such Metadata requests generated a heavy load on brokers in the cluster. Yet, currently, there is no metric to indicate the metadata response outgoing bytes rate. We propose to add such a metric in order to make the troubleshooting of such cases easier. was: It's not a common case, but we experienced the following problem in one of our clusters. The use case involves dynamically creating and deleting topics in the cluster, and the clients were constantly checking if a topic exists in a cluster using the special type of Metadata requests whose topics field is null in order to retrieve all topics before checking a topic's existence. A high rate of such Metadata requests generated a heavy load on brokers in the cluster. Yet, currently, there is no metric to indicate the metadata response outgoing bytes rate. We propose to add such a metric in order to make the troubleshooting of such cases easier. > Adding metric to indicate metadata response outgoing bytes rate > --- > > Key: KAFKA-13797 > URL: https://issues.apache.org/jira/browse/KAFKA-13797 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Priority: Minor > > It's not a common case, but we experienced the following problem in one of > our clusters. > The use case involves dynamically creating and deleting topics in the > cluster, and the clients were constantly using the special type of Metadata > requests whose topics field is null in order to retrieve all topics before > checking a topic's existence. > A high rate of such Metadata requests generated a heavy load on brokers in > the cluster. > Yet, currently, there is no metric to indicate the metadata response outgoing > bytes rate. > We propose to add such a metric in order to make the troubleshooting of such > cases easier. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13797) Adding metric to indicate metadata response outgoing bytes rate
Lucas Wang created KAFKA-13797: -- Summary: Adding metric to indicate metadata response outgoing bytes rate Key: KAFKA-13797 URL: https://issues.apache.org/jira/browse/KAFKA-13797 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang It's not a common case, but we experienced the following problem in one of our clusters. The use case involves dynamically creating and deleting topics in the cluster, and the clients were constantly checking if a topic exists in a cluster using the special type of Metadata requests whose topics field is null in order to retrieve all topics before checking a topic's existence. A high rate of such Metadata requests generated a heavy load on brokers in the cluster. Yet, currently, there is no metric to indicate the metadata response outgoing bytes rate. We propose to add such a metric in order to make the troubleshooting of such cases easier. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13188) Release the memory back into MemoryPool
Lucas Wang created KAFKA-13188: -- Summary: Release the memory back into MemoryPool Key: KAFKA-13188 URL: https://issues.apache.org/jira/browse/KAFKA-13188 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang Tushar made a [hotfix change|https://github.com/linkedin/kafka/pull/186] to the linkedin/kafka repo hosting apache kafka 2.4. The change is about releasing memory back to the MemoryPool for the kafka consumer, and his benchmark showed significant improvement in terms of the memory graduating from Young Gen and promoted to Old Gen. Given the benefit, the change should also be added trunk. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10548) Implement deletion logic for LeaderAndIsrRequests
[ https://issues.apache.org/jira/browse/KAFKA-10548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388367#comment-17388367 ] Lucas Wang commented on KAFKA-10548: Hi [~jolshan] thanks for the KIP-516 and for keeping track of the tasks. At LinkedIn, we are very interested in adopting the better topic deletions. Have you started/planned to work on this ticket? If not, do you mind if I take it over? > Implement deletion logic for LeaderAndIsrRequests > - > > Key: KAFKA-10548 > URL: https://issues.apache.org/jira/browse/KAFKA-10548 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Priority: Major > > This will allow for specialized deletion logic when receiving > LeaderAndIsrRequests > Will also create and utilize delete.stale.topic.delay.ms configuration option -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12315) Clearing the ZkReplicaStateMachine request batch state upon ControllerMovedException
[ https://issues.apache.org/jira/browse/KAFKA-12315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-12315: --- Description: As shown in the attached sequence diagram, during topic deletion the following sequence of events can happen 1. The ZkReplicaStateMachine calls AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and adds some entries to its stopReplicaRequestMap 2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr 3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a ControllerMovedException may be thrown due to zkVersion check failure 4. The ControllerMovedException is captured by the ZkPartitionStateMachine and an error such as the following is created: {code:java} 2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Controller moved to another broker when moving some replicas to OfflineReplica state org.apache.kafka.common.errors.ControllerMovedException: Controller epoch zkVersion check fails. Expected zkVersion = 139{code} 5. The ControllerMovedException is rethrown and captured by the KafkaController, which will resign At this point, the controller has resigned, however the stopReplicaRequestMap state populated in step 1 hasn't been cleared. Later on, when the controller wins an election and becomes the active controller again, an IllegalStateException will be triggered due to the left over state: {code:java} 2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Error while moving some replicas to OnlineReplica state java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some StopReplica state changes Map(6121 -> ListB\ uffer(StopRepl\ icaRequestInfo([Topic=,Partition=2,Replica=6121],false))) might be lost at kafka.controller.AbstractControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:383) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:109) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:40) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:365) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.elect(KafkaController.scala:1484) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.processReelect(KafkaController.scala:1972) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.process(KafkaController.scala:2065) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137) ~[kafka_2.12-2.4.1.10.jar:?] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [scala-library-2.12.10.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) [kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137) [kafka_2.12-2.4.1.10.jar:?] at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) [kafka_2.12-2.4.1.10.jar:?]{code} Essentially, the controller is not able to transition some replicas to OnlineReplica state, and it cannot send any requests to any brokers via the ReplicaStateMachine. was: As shown in the attached sequence diagram, during topic deletion the following sequence of events can happen 1. The ZkReplicaStateMachine calls AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and adds some entries to its stopReplicaRequestMap 2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr 3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a ControllerMovedException may be thrown due to zkVersion check failure 4. The ControllerMovedException is captured by the ZkPartitionStateMachine and an error such as the following is created: 2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Controller moved to another broker when moving some replicas to OfflineReplica state org.apache.kafka.common.errors.ControllerMovedException: Controller epoch zkVersion check fails. Expected zkVersion = 139 5. The ControllerMovedException is rethrown and captured by the KafkaController, which will resign At this point, the controller has resigned, however the stopReplicaRequestMap state populated in step 1 hasn't been cleared. Later on, when the controller wins an election and becomes the active controller again, an IllegalStateException will be triggered due to the left over state: {code:java} 2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMa
[jira] [Updated] (KAFKA-12315) Clearing the ZkReplicaStateMachine request batch state upon ControllerMovedException
[ https://issues.apache.org/jira/browse/KAFKA-12315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-12315: --- Description: As shown in the attached sequence diagram, during topic deletion the following sequence of events can happen 1. The ZkReplicaStateMachine calls AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and adds some entries to its stopReplicaRequestMap 2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr 3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a ControllerMovedException may be thrown due to zkVersion check failure 4. The ControllerMovedException is captured by the ZkPartitionStateMachine and an error such as the following is created: 2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Controller moved to another broker when moving some replicas to OfflineReplica state org.apache.kafka.common.errors.ControllerMovedException: Controller epoch zkVersion check fails. Expected zkVersion = 139 5. The ControllerMovedException is rethrown and captured by the KafkaController, which will resign At this point, the controller has resigned, however the stopReplicaRequestMap state populated in step 1 hasn't been cleared. Later on, when the controller wins an election and becomes the active controller again, an IllegalStateException will be triggered due to the left over state: {code:java} 2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Error while moving some replicas to OnlineReplica state java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some StopReplica state changes Map(6121 -> ListB\ uffer(StopRepl\ icaRequestInfo([Topic=,Partition=2,Replica=6121],false))) might be lost at kafka.controller.AbstractControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:383) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:109) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:40) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:365) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.elect(KafkaController.scala:1484) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.processReelect(KafkaController.scala:1972) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.process(KafkaController.scala:2065) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137) ~[kafka_2.12-2.4.1.10.jar:?] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [scala-library-2.12.10.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) [kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137) [kafka_2.12-2.4.1.10.jar:?] at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) [kafka_2.12-2.4.1.10.jar:?]{code} Essentially, the controller is not able to transition some replicas to OnlineReplica state, and it cannot send any requests to any brokers via the ReplicaStateMachine. was: As shown in the attached sequence diagram, during topic deletion the following sequence of events can happen 1. The ZkReplicaStateMachine calls AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and adds some entries to its stopReplicaRequestMap 2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr 3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a ControllerMovedException may be thrown due to zkVersion check failure 4. The ControllerMovedException is captured by the ZkPartitionStateMachine and an error such as the following is created: 2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Controller moved to another broker when moving some replicas to OfflineReplica state org.apache.kafka.common.errors.ControllerMovedException: Controller epoch zkVersion check fails. Expected zkVersion = 139 5. The ControllerMovedException is rethrown and captured by the KafkaController, which will resign At this point, the controller has resigned, however the stopReplicaRequestMap state populated in step 1 hasn't been cleared. Later on, when the controller wins an election and becomes the active controller again, an IllegalStateException will be triggered due to the left over state: ``` 2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=3
[jira] [Created] (KAFKA-12315) Clearing the ZkReplicaStateMachine request batch state upon ControllerMovedException
Lucas Wang created KAFKA-12315: -- Summary: Clearing the ZkReplicaStateMachine request batch state upon ControllerMovedException Key: KAFKA-12315 URL: https://issues.apache.org/jira/browse/KAFKA-12315 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang Attachments: controller_moved_left_over_state.png As shown in the attached sequence diagram, during topic deletion the following sequence of events can happen 1. The ZkReplicaStateMachine calls AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and adds some entries to its stopReplicaRequestMap 2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr 3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a ControllerMovedException may be thrown due to zkVersion check failure 4. The ControllerMovedException is captured by the ZkPartitionStateMachine and an error such as the following is created: 2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Controller moved to another broker when moving some replicas to OfflineReplica state org.apache.kafka.common.errors.ControllerMovedException: Controller epoch zkVersion check fails. Expected zkVersion = 139 5. The ControllerMovedException is rethrown and captured by the KafkaController, which will resign At this point, the controller has resigned, however the stopReplicaRequestMap state populated in step 1 hasn't been cleared. Later on, when the controller wins an election and becomes the active controller again, an IllegalStateException will be triggered due to the left over state: ``` 2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine controllerId=31862] Error while moving some replicas to OnlineReplica state java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some StopReplica state changes Map(6121 -> ListB\ uffer(StopRepl\ icaRequestInfo([Topic=,Partition=2,Replica=6121],false))) might be lost at kafka.controller.AbstractControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:383) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:109) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:40) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:365) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.elect(KafkaController.scala:1484) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.processReelect(KafkaController.scala:1972) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.KafkaController.process(KafkaController.scala:2065) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53) ~[kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137) ~[kafka_2.12-2.4.1.10.jar:?] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [scala-library-2.12.10.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) [kafka_2.12-2.4.1.10.jar:?] at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137) [kafka_2.12-2.4.1.10.jar:?] at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) [kafka_2.12-2.4.1.10.jar:?] ``` Essentially, the controller is not able to transition some replicas to OnlineReplica state, and it cannot send any requests to any brokers via the ReplicaStateMachine. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10751) Generate log to help estimate messages lost during ULE
[ https://issues.apache.org/jira/browse/KAFKA-10751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235702#comment-17235702 ] Lucas Wang commented on KAFKA-10751: PR submitted: https://github.com/apache/kafka/pull/9533 > Generate log to help estimate messages lost during ULE > -- > > Key: KAFKA-10751 > URL: https://issues.apache.org/jira/browse/KAFKA-10751 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Major > > During Unclean Leader Election, there could be data loss due to truncation at > the resigned leader. > Suppose there are 3 brokers that has replicas for a given partition: > Broker A (leader) with largest offset 9 (log end offset 10) > Broker B (follower) with largest offset 4 (log end offset 5) > Broker C (follower) with largest offset 1 (log end offset 2) > Only the leader A is in the ISR with B and C lagging behind. > Now an unclean leader election causes the leadership to be transferred to C. > Broker A would need to truncate 8 messages, and Broker B 3 messages. > Case 1: if these messages have been produced with acks=0 or 1, then clients > would experience 8 lost messages. > Case 2: if the client is using acks=all and the partition's minISR setting is > 2, and further let's assume broker B dropped out of the ISR after receiving > the message with offset 4, then only the messages with offset<=4 have been > acked to the client. The truncation effectively causes the client to lose 3 > messages. > Knowing the exact amount of data loss involves knowing the client's acks > setting when the messages are produced, and also whether the messages have > been sufficiently replicated according to the MinISR setting. > If getting the exact data loss is too involved, at least there should be logs > to help ESTIMATE the amount of data loss. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10751) Generate log to help estimate messages lost during ULE
Lucas Wang created KAFKA-10751: -- Summary: Generate log to help estimate messages lost during ULE Key: KAFKA-10751 URL: https://issues.apache.org/jira/browse/KAFKA-10751 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang Assignee: Lucas Wang During Unclean Leader Election, there could be data loss due to truncation at the resigned leader. Suppose there are 3 brokers that has replicas for a given partition: Broker A (leader) with largest offset 9 (log end offset 10) Broker B (follower) with largest offset 4 (log end offset 5) Broker C (follower) with largest offset 1 (log end offset 2) Only the leader A is in the ISR with B and C lagging behind. Now an unclean leader election causes the leadership to be transferred to C. Broker A would need to truncate 8 messages, and Broker B 3 messages. Case 1: if these messages have been produced with acks=0 or 1, then clients would experience 8 lost messages. Case 2: if the client is using acks=all and the partition's minISR setting is 2, and further let's assume broker B dropped out of the ISR after receiving the message with offset 4, then only the messages with offset<=4 have been acked to the client. The truncation effectively causes the client to lose 3 messages. Knowing the exact amount of data loss involves knowing the client's acks setting when the messages are produced, and also whether the messages have been sufficiently replicated according to the MinISR setting. If getting the exact data loss is too involved, at least there should be logs to help ESTIMATE the amount of data loss. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10734) Speedup the processing of LeaderAndIsr request
[ https://issues.apache.org/jira/browse/KAFKA-10734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235061#comment-17235061 ] Lucas Wang commented on KAFKA-10734: Thanks for your reply [~junrao]. In some clusters, our SREs set the num.replica.fetchers to be much greater than 1, probably for the sake of higher replication throughput. > Speedup the processing of LeaderAndIsr request > -- > > Key: KAFKA-10734 > URL: https://issues.apache.org/jira/browse/KAFKA-10734 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Major > > Consider the case where a LeaderAndIsr request contains many partitions, of > which the broker is asked to become the follower. Let's call these partitions > *partitionsToMakeFollower*. Further more, let's assume the cluster has n > brokers and each broker is configured to have m replica fetchers (via the > num.replica.fetchers config). > The broker is likely to have (n-1) * m fetcher threads. > Processing the LeaderAndIsr request requires > 1. removing the "partitionsToMakeFollower" from all of the fetcher threads > sequentially so that they won't be fetching from obsolete leaders. > 2. adding the "partitionsToMakeFollower" to all of the fetcher threads > sequentially > 3. shutting down the idle fetcher threads sequentially (by checking the > number of partitions held by each fetcher thread) > On top of that, for each of the 3 operations above, the operation is handled > by the request handler thread (i.e. io thread). And to complete the > operation, the request handler thread needs to contend for the > "partitionMapLock" with the corresponding fetcher thread. In the worst case, > the request handler thread is blocked for (n-1) * m times for removing the > partitions, another (n-1) * m times for adding the partitions, and yet > another (n-1) * m times for shutting down the idle fetcher threads. > Overall, all of the blocking can result in a significant delay in processing > the LeaderAndIsr request. The further implication is that if the follower > delays its fetching from the leader, there could be under MinISR partitions > in the cluster, causing unavailability for clients. > This ticket is created to track speedup in the processing of the LeaderAndIsr > request. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10734) Speedup the processing of LeaderAndIsr request
Lucas Wang created KAFKA-10734: -- Summary: Speedup the processing of LeaderAndIsr request Key: KAFKA-10734 URL: https://issues.apache.org/jira/browse/KAFKA-10734 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang Assignee: Lucas Wang Consider the case where a LeaderAndIsr request contains many partitions, of which the broker is asked to become the follower. Let's call these partitions *partitionsToMakeFollower*. Further more, let's assume the cluster has n brokers and each broker is configured to have m replica fetchers (via the num.replica.fetchers config). The broker is likely to have (n-1) * m fetcher threads. Processing the LeaderAndIsr request requires 1. removing the "partitionsToMakeFollower" from all of the fetcher threads sequentially so that they won't be fetching from obsolete leaders. 2. adding the "partitionsToMakeFollower" to all of the fetcher threads sequentially 3. shutting down the idle fetcher threads sequentially (by checking the number of partitions held by each fetcher thread) On top of that, for each of the 3 operations above, the operation is handled by the request handler thread (i.e. io thread). And to complete the operation, the request handler thread needs to contend for the "partitionMapLock" with the corresponding fetcher thread. In the worst case, the request handler thread is blocked for (n-1) * m times for removing the partitions, another (n-1) * m times for adding the partitions, and yet another (n-1) * m times for shutting down the idle fetcher threads. Overall, all of the blocking can result in a significant delay in processing the LeaderAndIsr request. The further implication is that if the follower delays its fetching from the leader, there could be under MinISR partitions in the cluster, causing unavailability for clients. This ticket is created to track speedup in the processing of the LeaderAndIsr request. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
[ https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738587#comment-16738587 ] Lucas Wang commented on KAFKA-7040: --- Hi [~apovzner], thanks for the info. Please go ahead and take over that task. Thanks! > The replica fetcher thread may truncate accepted messages during multiple > fast leadership transitions > - > > Key: KAFKA-7040 > URL: https://issues.apache.org/jira/browse/KAFKA-7040 > Project: Kafka > Issue Type: Bug > Components: replication >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > > Problem Statement: > Consider the scenario where there are two brokers, broker0, and broker1, and > there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as > the leader and broker0 as the follower. The following sequence of events > happened on broker0 > 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to > broker1, and awaits to get the response > 2. A LeaderAndISR request causes broker0 to become the leader for one > partition t1p0, which in turn will remove the partition t1p0 from the replica > fetcher thread > 3. Broker0 accepts some messages from a producer > 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and > broker0 to become the follower for partition t1p0. This will cause the > partition t1p0 to be added back to the replica fetcher thread on broker0. > 5. The replica fetcher thread on broker0 receives a response for the > LeaderEpoch request issued in step 1, and truncates the accepted messages in > step3. > The issue can be reproduced with the test from > https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea > [1] Initially we set up broker0 to be the follower of two partitions instead > of just one, to avoid the shutting down of the replica fetcher thread when it > becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-4453) add request prioritization
[ https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang reassigned KAFKA-4453: - Assignee: Mayuresh Gharat (was: Lucas Wang) > add request prioritization > -- > > Key: KAFKA-4453 > URL: https://issues.apache.org/jira/browse/KAFKA-4453 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Mayuresh Gharat >Priority: Major > > Today all requests (client requests, broker requests, controller requests) to > a broker are put into the same queue. They all have the same priority. So a > backlog of requests ahead of the controller request will delay the processing > of controller requests. This causes requests infront of the controller > request to get processed based on stale state. > Side effects may include giving clients stale metadata\[1\], rejecting > ProduceRequests and FetchRequests\[2\], and data loss (for some > unofficial\[3\] definition of data loss in terms of messages beyond the high > watermark)\[4\]. > We'd like to minimize the number of requests processed based on stale state. > With request prioritization, controller requests get processed before regular > queued up requests, so requests can get processed with up-to-date state. > \[1\] Say a client's MetadataRequest is sitting infront of a controller's > UpdateMetadataRequest on a given broker's request queue. Suppose the > MetadataRequest is for a topic whose partitions have recently undergone > leadership changes and that these leadership changes are being broadcasted > from the controller in the later UpdateMetadataRequest. Today the broker > processes the MetadataRequest before processing the UpdateMetadataRequest, > meaning the metadata returned to the client will be stale. The client will > waste a roundtrip sending requests to the stale partition leader, get a > NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the > topic metadata again. > \[2\] Clients can issue ProduceRequests to the wrong broker based on stale > metadata, causing rejected ProduceRequests. Based on how long the client acts > based on the stale metadata, the impact may or may not be visible to a > producer application. If the number of rejected ProduceRequests does not > exceed the max number of retries, the producer application would not be > impacted. On the other hand, if the retries are exhausted, the failed produce > will be visible to the producer application. > \[3\] The official definition of data loss in kafka is when we lose a > "committed" message. A message is considered "committed" when all in sync > replicas for that partition have applied it to their log. > \[4\] Say a number of ProduceRequests are sitting infront of a controller's > LeaderAndIsrRequest on a given broker's request queue. Suppose the > ProduceRequests are for partitions whose leadership has recently shifted out > from the current broker to another broker in the replica set. Today the > broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning > the ProduceRequests are getting processed on the former partition leader. As > part of becoming a follower for a partition, the broker truncates the log to > the high-watermark. With weaker ack settings such as acks=1, the leader may > successfully write to its own log, respond to the user with a success, > process the LeaderAndIsrRequest making the broker a follower of the > partition, and truncate the log to a point before the user's produced > messages. So users have a false sense that their produce attempt succeeded > while in reality their messages got erased. While technically part of what > they signed up for with acks=1, it can still come as a surprise. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7350) Improve or remove the PreferredReplicaImbalanceCount metric
[ https://issues.apache.org/jira/browse/KAFKA-7350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang reassigned KAFKA-7350: - Assignee: Ahmed Al-Mehdi (was: Lucas Wang) > Improve or remove the PreferredReplicaImbalanceCount metric > --- > > Key: KAFKA-7350 > URL: https://issues.apache.org/jira/browse/KAFKA-7350 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Ahmed Al-Mehdi >Priority: Major > > In KAFKA-6753, we identified that in the ControllerEventManager, updating two > metrics after processing every controller event ends up consuming too much > CPU. The first metric OfflinePartitionCount is resolved in KAFKA-6753, and > this ticket is for tracking progress on the 2nd metric > PreferredReplicaImbalanceCount. > The options we have about this metric include > 1. Remove this metric given that if necessary, the value of this metric can > be derived by getting the metadata of all topics in the cluster > 2. Piggyback the update of the metric every time the auto leader balancer > runs. The benefit is keeping this metric. However the downside is that this > metric may then get obsolete and incorrect depending on when it's checked. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6753) Speed up event processing on the controller
[ https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6753: -- Description: The existing controller code updates metrics after processing every event. This can slow down event processing on the controller tremendously. In one profiling we see that updating metrics takes nearly 100% of the CPU for the controller event processing thread. Specifically the slowness can be attributed to two factors: 1. Each invocation to update the metrics is expensive. Specifically trying to calculate the offline partitions count requires iterating through all the partitions in the cluster to check if the partition is offline; and calculating the preferred replica imbalance count requires iterating through all the partitions in the cluster to check if a partition has a leader other than the preferred leader. In a large cluster, the number of partitions can be quite large, all seen by the controller. Even if the time spent to check a single partition is small, the accumulation effect of so many partitions in the cluster can make the invocation to update metrics quite expensive. One might argue that maybe the logic for processing each single partition is not optimized, we checked the CPU percentage of leaf nodes in the profiling result, and found that inside the loops of collection objects, e.g. the set of all partitions, no single function dominates the processing. Hence the large number of the partitions in a cluster is the main contributor to the slowness of one invocation to update the metrics. 2. The invocation to update metrics is called many times when the is a high number of events to be processed by the controller, one invocation after processing any event. This ticket is used to track how we change the logic for the OfflinePartitionCount metric. And KAFKA-7350 will be used to track progress for the PreferredReplicaImbalanceCount metric. was: The existing controller code updates metrics after processing every event. This can slow down event processing on the controller tremendously. In one profiling we see that updating metrics takes nearly 100% of the CPU for the controller event processing thread. Specifically the slowness can be attributed to two factors: 1. Each invocation to update the metrics is expensive. Specifically trying to calculate the offline partitions count requires iterating through all the partitions in the cluster to check if the partition is offline; and calculating the preferred replica imbalance count requires iterating through all the partitions in the cluster to check if a partition has a leader other than the preferred leader. In a large cluster, the number of partitions can be quite large, all seen by the controller. Even if the time spent to check a single partition is small, the accumulation effect of so many partitions in the cluster can make the invocation to update metrics quite expensive. One might argue that maybe the logic for processing each single partition is not optimized, we checked the CPU percentage of leaf nodes in the profiling result, and found that inside the loops of collection objects, e.g. the set of all partitions, no single function dominates the processing. Hence the large number of the partitions in a cluster is the main contributor to the slowness of one invocation to update the metrics. 2. The invocation to update metrics is called many times when the is a high number of events to be processed by the controller, one invocation after processing any event. This ticket is used to track how we change the logic for the OfflinePartitionCount metric. > Speed up event processing on the controller > > > Key: KAFKA-6753 > URL: https://issues.apache.org/jira/browse/KAFKA-6753 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > Fix For: 2.1.0 > > Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png > > > The existing controller code updates metrics after processing every event. > This can slow down event processing on the controller tremendously. In one > profiling we see that updating metrics takes nearly 100% of the CPU for the > controller event processing thread. Specifically the slowness can be > attributed to two factors: > 1. Each invocation to update the metrics is expensive. Specifically trying to > calculate the offline partitions count requires iterating through all the > partitions in the cluster to check if the partition is offline; and > calculating the preferred replica imbalance count requires iterating through > all the partitions in the cluster to check if a partition has a leader other > than the preferred leader. In a large cluster, the number of partitions can
[jira] [Created] (KAFKA-7350) Improve or remove the PreferredReplicaImbalanceCount metric
Lucas Wang created KAFKA-7350: - Summary: Improve or remove the PreferredReplicaImbalanceCount metric Key: KAFKA-7350 URL: https://issues.apache.org/jira/browse/KAFKA-7350 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang Assignee: Lucas Wang In KAFKA-6753, we identified that in the ControllerEventManager, updating two metrics after processing every controller event ends up consuming too much CPU. The first metric OfflinePartitionCount is resolved in KAFKA-6753, and this ticket is for tracking progress on the 2nd metric PreferredReplicaImbalanceCount. The options we have about this metric include 1. Remove this metric given that if necessary, the value of this metric can be derived by getting the metadata of all topics in the cluster 2. Piggyback the update of the metric every time the auto leader balancer runs. The benefit is keeping this metric. However the downside is that this metric may then get obsolete and incorrect depending on when it's checked. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6753) Speed up event processing on the controller
[ https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6753: -- Description: The existing controller code updates metrics after processing every event. This can slow down event processing on the controller tremendously. In one profiling we see that updating metrics takes nearly 100% of the CPU for the controller event processing thread. Specifically the slowness can be attributed to two factors: 1. Each invocation to update the metrics is expensive. Specifically trying to calculate the offline partitions count requires iterating through all the partitions in the cluster to check if the partition is offline; and calculating the preferred replica imbalance count requires iterating through all the partitions in the cluster to check if a partition has a leader other than the preferred leader. In a large cluster, the number of partitions can be quite large, all seen by the controller. Even if the time spent to check a single partition is small, the accumulation effect of so many partitions in the cluster can make the invocation to update metrics quite expensive. One might argue that maybe the logic for processing each single partition is not optimized, we checked the CPU percentage of leaf nodes in the profiling result, and found that inside the loops of collection objects, e.g. the set of all partitions, no single function dominates the processing. Hence the large number of the partitions in a cluster is the main contributor to the slowness of one invocation to update the metrics. 2. The invocation to update metrics is called many times when the is a high number of events to be processed by the controller, one invocation after processing any event. This ticket is used to track how we change the logic for the OfflinePartitionCount metric. was: The existing controller code updates metrics after processing every event. This can slow down event processing on the controller tremendously. In one profiling we see that updating metrics takes nearly 100% of the CPU for the controller event processing thread. Specifically the slowness can be attributed to two factors: 1. Each invocation to update the metrics is expensive. Specifically trying to calculate the offline partitions count requires iterating through all the partitions in the cluster to check if the partition is offline; and calculating the preferred replica imbalance count requires iterating through all the partitions in the cluster to check if a partition has a leader other than the preferred leader. In a large cluster, the number of partitions can be quite large, all seen by the controller. Even if the time spent to check a single partition is small, the accumulation effect of so many partitions in the cluster can make the invocation to update metrics quite expensive. One might argue that maybe the logic for processing each single partition is not optimized, we checked the CPU percentage of leaf nodes in the profiling result, and found that inside the loops of collection objects, e.g. the set of all partitions, no single function dominates the processing. Hence the large number of the partitions in a cluster is the main contributor to the slowness of one invocation to update the metrics. 2. The invocation to update metrics is called many times when the is a high number of events to be processed by the controller, one invocation after processing any event. > Speed up event processing on the controller > > > Key: KAFKA-6753 > URL: https://issues.apache.org/jira/browse/KAFKA-6753 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > Fix For: 2.1.0 > > Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png > > > The existing controller code updates metrics after processing every event. > This can slow down event processing on the controller tremendously. In one > profiling we see that updating metrics takes nearly 100% of the CPU for the > controller event processing thread. Specifically the slowness can be > attributed to two factors: > 1. Each invocation to update the metrics is expensive. Specifically trying to > calculate the offline partitions count requires iterating through all the > partitions in the cluster to check if the partition is offline; and > calculating the preferred replica imbalance count requires iterating through > all the partitions in the cluster to check if a partition has a leader other > than the preferred leader. In a large cluster, the number of partitions can > be quite large, all seen by the controller. Even if the time spent to check a > single partition is small, the accumulation effect of so many partitions in > the cluster can make th
[jira] [Resolved] (KAFKA-6753) Speed up event processing on the controller
[ https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang resolved KAFKA-6753. --- Resolution: Fixed > Speed up event processing on the controller > > > Key: KAFKA-6753 > URL: https://issues.apache.org/jira/browse/KAFKA-6753 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > Fix For: 2.1.0 > > Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png > > > The existing controller code updates metrics after processing every event. > This can slow down event processing on the controller tremendously. In one > profiling we see that updating metrics takes nearly 100% of the CPU for the > controller event processing thread. Specifically the slowness can be > attributed to two factors: > 1. Each invocation to update the metrics is expensive. Specifically trying to > calculate the offline partitions count requires iterating through all the > partitions in the cluster to check if the partition is offline; and > calculating the preferred replica imbalance count requires iterating through > all the partitions in the cluster to check if a partition has a leader other > than the preferred leader. In a large cluster, the number of partitions can > be quite large, all seen by the controller. Even if the time spent to check a > single partition is small, the accumulation effect of so many partitions in > the cluster can make the invocation to update metrics quite expensive. One > might argue that maybe the logic for processing each single partition is not > optimized, we checked the CPU percentage of leaf nodes in the profiling > result, and found that inside the loops of collection objects, e.g. the set > of all partitions, no single function dominates the processing. Hence the > large number of the partitions in a cluster is the main contributor to the > slowness of one invocation to update the metrics. > 2. The invocation to update metrics is called many times when the is a high > number of events to be processed by the controller, one invocation after > processing any event. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6753) Speed up event processing on the controller
[ https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16594328#comment-16594328 ] Lucas Wang commented on KAFKA-6753: --- [~junrao] Thanks will do. > Speed up event processing on the controller > > > Key: KAFKA-6753 > URL: https://issues.apache.org/jira/browse/KAFKA-6753 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > Fix For: 2.1.0 > > Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png > > > The existing controller code updates metrics after processing every event. > This can slow down event processing on the controller tremendously. In one > profiling we see that updating metrics takes nearly 100% of the CPU for the > controller event processing thread. Specifically the slowness can be > attributed to two factors: > 1. Each invocation to update the metrics is expensive. Specifically trying to > calculate the offline partitions count requires iterating through all the > partitions in the cluster to check if the partition is offline; and > calculating the preferred replica imbalance count requires iterating through > all the partitions in the cluster to check if a partition has a leader other > than the preferred leader. In a large cluster, the number of partitions can > be quite large, all seen by the controller. Even if the time spent to check a > single partition is small, the accumulation effect of so many partitions in > the cluster can make the invocation to update metrics quite expensive. One > might argue that maybe the logic for processing each single partition is not > optimized, we checked the CPU percentage of leaf nodes in the profiling > result, and found that inside the loops of collection objects, e.g. the set > of all partitions, no single function dominates the processing. Hence the > large number of the partitions in a cluster is the main contributor to the > slowness of one invocation to update the metrics. > 2. The invocation to update metrics is called many times when the is a high > number of events to be processed by the controller, one invocation after > processing any event. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
[ https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang reassigned KAFKA-7040: - Assignee: Lucas Wang > The replica fetcher thread may truncate accepted messages during multiple > fast leadership transitions > - > > Key: KAFKA-7040 > URL: https://issues.apache.org/jira/browse/KAFKA-7040 > Project: Kafka > Issue Type: Bug > Components: replication >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > > Problem Statement: > Consider the scenario where there are two brokers, broker0, and broker1, and > there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as > the leader and broker0 as the follower. The following sequence of events > happened on broker0 > 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to > broker1, and awaits to get the response > 2. A LeaderAndISR request causes broker0 to become the leader for one > partition t1p0, which in turn will remove the partition t1p0 from the replica > fetcher thread > 3. Broker0 accepts some messages from a producer > 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and > broker0 to become the follower for partition t1p0. This will cause the > partition t1p0 to be added back to the replica fetcher thread on broker0. > 5. The replica fetcher thread on broker0 receives a response for the > LeaderEpoch request issued in step 1, and truncates the accepted messages in > step3. > The issue can be reproduced with the test from > https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea > [1] Initially we set up broker0 to be the follower of two partitions instead > of just one, to avoid the shutting down of the replica fetcher thread when it > becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
[ https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590634#comment-16590634 ] Lucas Wang commented on KAFKA-7040: --- [~lindong] Sure, I will own this ticket and work on the fix. > The replica fetcher thread may truncate accepted messages during multiple > fast leadership transitions > - > > Key: KAFKA-7040 > URL: https://issues.apache.org/jira/browse/KAFKA-7040 > Project: Kafka > Issue Type: Bug > Components: replication >Reporter: Lucas Wang >Priority: Minor > > Problem Statement: > Consider the scenario where there are two brokers, broker0, and broker1, and > there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as > the leader and broker0 as the follower. The following sequence of events > happened on broker0 > 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to > broker1, and awaits to get the response > 2. A LeaderAndISR request causes broker0 to become the leader for one > partition t1p0, which in turn will remove the partition t1p0 from the replica > fetcher thread > 3. Broker0 accepts some messages from a producer > 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and > broker0 to become the follower for partition t1p0. This will cause the > partition t1p0 to be added back to the replica fetcher thread on broker0. > 5. The replica fetcher thread on broker0 receives a response for the > LeaderEpoch request issued in step 1, and truncates the accepted messages in > step3. > The issue can be reproduced with the test from > https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea > [1] Initially we set up broker0 to be the follower of two partitions instead > of just one, to avoid the shutting down of the replica fetcher thread when it > becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (KAFKA-6753) Speed up event processing on the controller
[ https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang reopened KAFKA-6753: --- > Speed up event processing on the controller > > > Key: KAFKA-6753 > URL: https://issues.apache.org/jira/browse/KAFKA-6753 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > Fix For: 2.1.0 > > Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png > > > The existing controller code updates metrics after processing every event. > This can slow down event processing on the controller tremendously. In one > profiling we see that updating metrics takes nearly 100% of the CPU for the > controller event processing thread. Specifically the slowness can be > attributed to two factors: > 1. Each invocation to update the metrics is expensive. Specifically trying to > calculate the offline partitions count requires iterating through all the > partitions in the cluster to check if the partition is offline; and > calculating the preferred replica imbalance count requires iterating through > all the partitions in the cluster to check if a partition has a leader other > than the preferred leader. In a large cluster, the number of partitions can > be quite large, all seen by the controller. Even if the time spent to check a > single partition is small, the accumulation effect of so many partitions in > the cluster can make the invocation to update metrics quite expensive. One > might argue that maybe the logic for processing each single partition is not > optimized, we checked the CPU percentage of leaf nodes in the profiling > result, and found that inside the loops of collection objects, e.g. the set > of all partitions, no single function dominates the processing. Hence the > large number of the partitions in a cluster is the main contributor to the > slowness of one invocation to update the metrics. > 2. The invocation to update metrics is called many times when the is a high > number of events to be processed by the controller, one invocation after > processing any event. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6753) Speed up event processing on the controller
[ https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16588186#comment-16588186 ] Lucas Wang commented on KAFKA-6753: --- [~junrao] Personally I'm in favor of removing this metric compared with giving possibly incorrect/stale metric. I can start the KIP to collect more feedback. Meanwhile I'll keep this ticket open for tracking progress of the remaining work. > Speed up event processing on the controller > > > Key: KAFKA-6753 > URL: https://issues.apache.org/jira/browse/KAFKA-6753 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > Fix For: 2.1.0 > > Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png > > > The existing controller code updates metrics after processing every event. > This can slow down event processing on the controller tremendously. In one > profiling we see that updating metrics takes nearly 100% of the CPU for the > controller event processing thread. Specifically the slowness can be > attributed to two factors: > 1. Each invocation to update the metrics is expensive. Specifically trying to > calculate the offline partitions count requires iterating through all the > partitions in the cluster to check if the partition is offline; and > calculating the preferred replica imbalance count requires iterating through > all the partitions in the cluster to check if a partition has a leader other > than the preferred leader. In a large cluster, the number of partitions can > be quite large, all seen by the controller. Even if the time spent to check a > single partition is small, the accumulation effect of so many partitions in > the cluster can make the invocation to update metrics quite expensive. One > might argue that maybe the logic for processing each single partition is not > optimized, we checked the CPU percentage of leaf nodes in the profiling > result, and found that inside the loops of collection objects, e.g. the set > of all partitions, no single function dominates the processing. Hence the > large number of the partitions in a cluster is the main contributor to the > slowness of one invocation to update the metrics. > 2. The invocation to update metrics is called many times when the is a high > number of events to be processed by the controller, one invocation after > processing any event. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6753) Speed up event processing on the controller
[ https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587994#comment-16587994 ] Lucas Wang commented on KAFKA-6753: --- Hi [~junrao] thanks reviewing the PR. Besides the offlinePartitionCount metric, the preferredReplicaImbalanceCount metric is another contributing factor for the observed problem. Making a similar change on this metric seems a quite involved effort since it depends on the set of partitions, the partition replica assignments, leadership changes, and topic deletion. Before diving into the details, (1) at least inside LinkedIn we don't use this metric at all, and (2) if needed, this count can be derived by requesting the metadata of all topics in the cluster. So I wonder whether it makes sense for us to remove this metric. > Speed up event processing on the controller > > > Key: KAFKA-6753 > URL: https://issues.apache.org/jira/browse/KAFKA-6753 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > Fix For: 2.1.0 > > Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png > > > The existing controller code updates metrics after processing every event. > This can slow down event processing on the controller tremendously. In one > profiling we see that updating metrics takes nearly 100% of the CPU for the > controller event processing thread. Specifically the slowness can be > attributed to two factors: > 1. Each invocation to update the metrics is expensive. Specifically trying to > calculate the offline partitions count requires iterating through all the > partitions in the cluster to check if the partition is offline; and > calculating the preferred replica imbalance count requires iterating through > all the partitions in the cluster to check if a partition has a leader other > than the preferred leader. In a large cluster, the number of partitions can > be quite large, all seen by the controller. Even if the time spent to check a > single partition is small, the accumulation effect of so many partitions in > the cluster can make the invocation to update metrics quite expensive. One > might argue that maybe the logic for processing each single partition is not > optimized, we checked the CPU percentage of leaf nodes in the profiling > result, and found that inside the loops of collection objects, e.g. the set > of all partitions, no single function dominates the processing. Hence the > large number of the partitions in a cluster is the main contributor to the > slowness of one invocation to update the metrics. > 2. The invocation to update metrics is called many times when the is a high > number of events to be processed by the controller, one invocation after > processing any event. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6753) Speed up event processing on the controller
[ https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6753: -- Description: The existing controller code updates metrics after processing every event. This can slow down event processing on the controller tremendously. In one profiling we see that updating metrics takes nearly 100% of the CPU for the controller event processing thread. Specifically the slowness can be attributed to two factors: 1. Each invocation to update the metrics is expensive. Specifically trying to calculate the offline partitions count requires iterating through all the partitions in the cluster to check if the partition is offline; and calculating the preferred replica imbalance count requires iterating through all the partitions in the cluster to check if a partition has a leader other than the preferred leader. In a large cluster, the number of partitions can be quite large, all seen by the controller. Even if the time spent to check a single partition is small, the accumulation effect of so many partitions in the cluster can make the invocation to update metrics quite expensive. One might argue that maybe the logic for processing each single partition is not optimized, we checked the CPU percentage of leaf nodes in the profiling result, and found that inside the loops of collection objects, e.g. the set of all partitions, no single function dominates the processing. Hence the large number of the partitions in a cluster is the main contributor to the slowness of one invocation to update the metrics. 2. The invocation to update metrics is called many times when the is a high number of events to be processed by the controller, one invocation after processing any event. was: The existing controller code updates metrics after processing every event. This can slow down event processing on the controller tremendously. In one profiling we see that updating metrics takes nearly 100% of the CPU for the controller event processing thread. Specifically the slowness can be attributed to two factors: 1. Each invocation to update the metrics is expensive. Specifically trying to calculate the offline partitions count requires iterating through all the partitions in the cluster to check if the partition is offline; and calculating the preferred replica imbalance count requires iterating through all the partitions in the cluster to check if a partition has a leader other than the preferred leader. In a large cluster, the number of partitions can be quite large, all seen by the controller. Even if the time spent to check a single partition is small, the accumulation effect of so many partitions in the cluster can make the invocation to update metrics quite expensive. One might argue that maybe the logic for processing each single partition is not optimized, we checked the CPU percentage of leaf nodes in the profiling result, and found that inside the loops of collection objects, e.g. the set of all partitions, no single function dominates the processing. Hence the large number of the partitions in a cluster is the main contributor to the slowness of one invocation to update the metrics. 2. The invocation to update metrics is called many times when the is a high number of events to be processed by the controller, one invocation after processing any event. The patch that will be submitted tries to fix bullet 2 above, i.e. reducing the number of invocations to update metrics. Instead of updating the metrics after processing any event, we only periodically check if the metrics needs to be updated, i.e. once every second. * If after the previous invocation to update metrics, there are other types of events that changed the controller’s state, then one second later the metrics will be updated. * If after the previous invocation, there has been no other types of events, then the call to update metrics can be bypassed. > Speed up event processing on the controller > > > Key: KAFKA-6753 > URL: https://issues.apache.org/jira/browse/KAFKA-6753 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > Fix For: 2.1.0 > > Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png > > > The existing controller code updates metrics after processing every event. > This can slow down event processing on the controller tremendously. In one > profiling we see that updating metrics takes nearly 100% of the CPU for the > controller event processing thread. Specifically the slowness can be > attributed to two factors: > 1. Each invocation to update the metrics is expensive. Specifically trying to > calculate the offline partitions count requires iterating through all the > partition
[jira] [Resolved] (KAFKA-6974) Changes the interaction between request handler threads and fetcher threads into an ASYNC model
[ https://issues.apache.org/jira/browse/KAFKA-6974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang resolved KAFKA-6974. --- Resolution: Won't Fix > Changes the interaction between request handler threads and fetcher threads > into an ASYNC model > --- > > Key: KAFKA-6974 > URL: https://issues.apache.org/jira/browse/KAFKA-6974 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Priority: Minor > > Problem Statement: > At LinkedIn, occasionally our clients complain about receiving consant > NotLeaderForPartition exceptions > Investigations: > For one investigated case, the cluster was going through a rolling bounce. > And we saw there was a ~8 minutes delay between an old partition leader > resigning and the new leader becoming active, based on entries of "Broker xxx > handling LeaderAndIsr request" in the state change log. > Our monitoring shows the LeaderAndISR request local time during the incident > went up to ~4 minutes. > Explanations: > One possible explanation of the ~8 minutes of delay is: > During controlled shutdown of a broker, the partitions whose leaders lie on > the shutting down broker need to go through leadership transitions. And the > controller process partitions in batches with each batch having > config.controlledShutdownPartitionBatchSize partitions, e.g. 100. > If the 1st LeaderAndISR sent to a new leader broker takes too long, e.g. 4 > minutes, then the subsequent LeaderAndISR requests can have an accumulated > delay of maybe 4 minutes, 8 minutes, or even 12 minutes... The reason is that > subsequent LeaderAndISR requests are blocked in a muted channel, given only > one LeaderAndISR request can be processed at a time with a > maxInFlightRequestsPerConnection setting of 1. When that happens, no existing > metric would show the total delay of 8 or 12 minutes for muted requests. > Now the question is why it took ~4 minutes for the the 1st LeaderAndISR > request to finish. > Explanation for the ~4 minutes of local time for LeaderAndISR request: > During processing of an LeaderAndISR request, the request handler thread > needs to add partitions to or remove partitions from partitionStates field of > the ReplicaFetcherThread, also shutdown idle fetcher threads by checking the > size of the partitionStates field. On the other hand, background fetcher > threads need to iterate through all the partitions in partitionStates in > order to build fetch request, and process fetch responses. The > synchronization between request handler thread and the fetcher threads is > done through a partitionMapLock. > Specifically, the fetcher threads may acquire the partitionMapLock, and then > calls the following functions for processing the fetch response > (1) processPartitionData, which in turn calls > (2) Replica.maybeIncrementLogStartOffset, which calls > (3) Log.maybeIncrementLogStartOffset, which calls > (4) LeaderEpochCache.clearAndFlushEarliest. > Now two factors contribute to the long holding of the partitionMapLock, > 1. function (4) above entails calling sync() to make sure data gets > persistent to the disk, which may potentially have a long latency > 2. All the 4 functions above can potentially be called for each partition in > the fetch response, multiplying the sync() latency by a factor of n. > The end result is that the request handler thread got blocked for a long time > trying to acquire the partitionMapLock of some fetcher inside > AbstractFetcherManager.shutdownIdleFetcherThreads since checking each > fetcher's partitionCount requires getting the partitionMapLock. > In our testing environment, we reproduced the problem and confirmed the > explanation above with a request handler thread getting blocked for 10 > seconds trying to acquire the partitionMapLock of one particular fetcher > thread, while there are many log entries showing "Incrementing log start > offset of partition..." > Proposed change: > We propose to change the interaction between the request handler threads and > the fetcher threads to an ASYNC model by using an event queue. All requests > to add or remove partitions, or shutdown idle fetcher threads are modeled as > items in the event queue. And only the fetcher threads can take items out of > the event queue and actually process them. > In the new ASYNC model, in order to be able to process an infinite sequence > of FetchRequests, a fetcher thread initially has one FetchRequest, and after > it's done processing one FetchRequest, it enqueues one more into its own > event queue. > Also since the current AbstractFetcherThread logic is inherited by both the > replica-fetcher-threads and the consumer-fetcher-threads for the old > consumer, and the latter has been deprecated,
[jira] [Created] (KAFKA-7180) In testHWCheckpointWithFailuresSingleLogSegment, wait until server1 has joined the ISR before shutting down server2
Lucas Wang created KAFKA-7180: - Summary: In testHWCheckpointWithFailuresSingleLogSegment, wait until server1 has joined the ISR before shutting down server2 Key: KAFKA-7180 URL: https://issues.apache.org/jira/browse/KAFKA-7180 Project: Kafka Issue Type: Bug Reporter: Lucas Wang Assignee: Lucas Wang In the testHWCheckpointWithFailuresSingleLogSegment method, the test logic is 1. shutdown server1 and then capture the leadership of a partition in the variable "leader", which should be server2 2. shutdown server2 and wait until the leadership has changed to a broker other than server2 through the line waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = Some(leader)) However when we execute step 2 and shutdown server2, it's possible that server1 has not caught up with the partition, and has not joined the ISR. With unclean leader election turned off, the leadership cannot be transferred to server1, causing the waited condition in step 2 to be never met. The obvious fix is to wait until server1 has joined the ISR before shutting down server2. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7162) Flaky unit tests caused by record creation timestamps differ from validation time by more than timestampDiffMaxMs
[ https://issues.apache.org/jira/browse/KAFKA-7162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang reassigned KAFKA-7162: - Assignee: Lucas Wang > Flaky unit tests caused by record creation timestamps differ from validation > time by more than timestampDiffMaxMs > - > > Key: KAFKA-7162 > URL: https://issues.apache.org/jira/browse/KAFKA-7162 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > > While running gradle unit tests, we found the test method > LogValidatorTest.testCompressedV1 can fail sometimes. Upon investigation, it > turns out the test method uses one set of timestamps, say t0, t1 and t2, for > the records, while using a separate timestamp, say t3, for the "now" > parameter when invoking the LogValidator.validateMessagesAndAssignOffsets > method. The validateMessagesAndAssignOffsets validation method also takes a > parameter timestampDiffMaxMs=1 second, that specifies the maximum allowed > time different between t3 and the timestamps in records, i.e. t0, t1, and t2. > While running unit tests, especially when multiple tests are run > simultaneously, there is no guarantee that the time difference between t3 and > t0 is within 1 second, causing the test method to flaky sometimes. Many other > test methods in the LogValidatorTest can suffer from the same problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7162) Flaky unit tests caused by record creation timestamps differ from validation time by more than timestampDiffMaxMs
Lucas Wang created KAFKA-7162: - Summary: Flaky unit tests caused by record creation timestamps differ from validation time by more than timestampDiffMaxMs Key: KAFKA-7162 URL: https://issues.apache.org/jira/browse/KAFKA-7162 Project: Kafka Issue Type: Bug Reporter: Lucas Wang While running gradle unit tests, we found the test method LogValidatorTest.testCompressedV1 can fail sometimes. Upon investigation, it turns out the test method uses one set of timestamps, say t0, t1 and t2, for the records, while using a separate timestamp, say t3, for the "now" parameter when invoking the LogValidator.validateMessagesAndAssignOffsets method. The validateMessagesAndAssignOffsets validation method also takes a parameter timestampDiffMaxMs=1 second, that specifies the maximum allowed time different between t3 and the timestamps in records, i.e. t0, t1, and t2. While running unit tests, especially when multiple tests are run simultaneously, there is no guarantee that the time difference between t3 and t0 is within 1 second, causing the test method to flaky sometimes. Many other test methods in the LogValidatorTest can suffer from the same problem. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
[ https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520899#comment-16520899 ] Lucas Wang commented on KAFKA-7040: --- Hi [~apovzner] Sorry for the delayed response. Regarding Case 1 above, I did some testing with min.insync.replicas == 2 and unclean leader election DISABLED, to check if a real message loss can happen https://github.com/apache/kafka/compare/trunk...gitlw:test_incorrect_truncation_causing_message_loss , and it turns out the answer is yes. On the high level if AFTER the truncation in step 5, broker0 again becomes the leader, and broker1 starts fetching from broker0 who has fewer messages, then the message will forever be lost. > The replica fetcher thread may truncate accepted messages during multiple > fast leadership transitions > - > > Key: KAFKA-7040 > URL: https://issues.apache.org/jira/browse/KAFKA-7040 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Wang >Priority: Minor > > Problem Statement: > Consider the scenario where there are two brokers, broker0, and broker1, and > there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as > the leader and broker0 as the follower. The following sequence of events > happened on broker0 > 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to > broker1, and awaits to get the response > 2. A LeaderAndISR request causes broker0 to become the leader for one > partition t1p0, which in turn will remove the partition t1p0 from the replica > fetcher thread > 3. Broker0 accepts some messages from a producer > 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and > broker0 to become the follower for partition t1p0. This will cause the > partition t1p0 to be added back to the replica fetcher thread on broker0. > 5. The replica fetcher thread on broker0 receives a response for the > LeaderEpoch request issued in step 1, and truncates the accepted messages in > step3. > The issue can be reproduced with the test from > https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea > [1] Initially we set up broker0 to be the follower of two partitions instead > of just one, to avoid the shutting down of the replica fetcher thread when it > becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
[ https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514422#comment-16514422 ] Lucas Wang commented on KAFKA-7040: --- Hi [~apovzner] Thanks for your comment. I didn't see an actual message loss in a real cluster, and this is purely based on reasoning and unit testing. In general, I think if truncation happens below the HW, it's possible for a message loss to happen. For example, after the truncation at step 5, leadership gets changed back to broker 0, and broker 1 becomes the follower. Then the message at offset 100 will forever be lost. > The replica fetcher thread may truncate accepted messages during multiple > fast leadership transitions > - > > Key: KAFKA-7040 > URL: https://issues.apache.org/jira/browse/KAFKA-7040 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Wang >Priority: Minor > > Problem Statement: > Consider the scenario where there are two brokers, broker0, and broker1, and > there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as > the leader and broker0 as the follower. The following sequence of events > happened on broker0 > 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to > broker1, and awaits to get the response > 2. A LeaderAndISR request causes broker0 to become the leader for one > partition t1p0, which in turn will remove the partition t1p0 from the replica > fetcher thread > 3. Broker0 accepts some messages from a producer > 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and > broker0 to become the follower for partition t1p0. This will cause the > partition t1p0 to be added back to the replica fetcher thread on broker0. > 5. The replica fetcher thread on broker0 receives a response for the > LeaderEpoch request issued in step 1, and truncates the accepted messages in > step3. > The issue can be reproduced with the test from > https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea > [1] Initially we set up broker0 to be the follower of two partitions instead > of just one, to avoid the shutting down of the replica fetcher thread when it > becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
[ https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508907#comment-16508907 ] Lucas Wang commented on KAFKA-7040: --- To be more specific, I think the following sequence of events may cause a truncation below HW. Say currently both broker0 and broker1 have finished processing of a LeaderAndISR request with leader being broker1, and leader epoch 10, and both of them have 100 messages in their respective log (with the largest offset 99, and LEO of 100). 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request with leader epoch 10 to broker1, and broker1 replies with a LEO of 100 given it's the latest leader epoch. Before the replica fetcher on broker0 acquires the AbstractFetcherThread.partitionMapLock and processes the LeaderEpoch response, it goes through steps 2-4 first. 2. A LeaderAndISR request causes broker0 to become the leader for one partition t1p0, which in turn will remove the partition t1p0 from the replica fetcher thread 3. Broker0 accepts one message at offset 100 from a producer, and the message gets replicated to broker1, causing the HW on broker0 to go up to 100. 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and broker0 to become the follower for partition t1p0. This will cause the partition t1p0 to be added back to the replica fetcher thread on broker0. 5. The replica fetcher thread on broker0 processes the LeaderEpoch response received in step 1, and truncates the accepted message with offset 100 in step3. > The replica fetcher thread may truncate accepted messages during multiple > fast leadership transitions > - > > Key: KAFKA-7040 > URL: https://issues.apache.org/jira/browse/KAFKA-7040 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Wang >Priority: Minor > > Problem Statement: > Consider the scenario where there are two brokers, broker0, and broker1, and > there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as > the leader and broker0 as the follower. The following sequence of events > happened on broker0 > 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to > broker1, and awaits to get the response > 2. A LeaderAndISR request causes broker0 to become the leader for one > partition t1p0, which in turn will remove the partition t1p0 from the replica > fetcher thread > 3. Broker0 accepts some messages from a producer > 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and > broker0 to become the follower for partition t1p0. This will cause the > partition t1p0 to be added back to the replica fetcher thread on broker0. > 5. The replica fetcher thread on broker0 receives a response for the > LeaderEpoch request issued in step 1, and truncates the accepted messages in > step3. > The issue can be reproduced with the test from > https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea > [1] Initially we set up broker0 to be the follower of two partitions instead > of just one, to avoid the shutting down of the replica fetcher thread when it > becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
[ https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508867#comment-16508867 ] Lucas Wang commented on KAFKA-7040: --- [~lindong] Thanks for pointing out the truncation may happen below high watermark. Also your suggested change seems to work by remembering that some partition state has been used, so that we can distinguish between different generations of the partition state. > The replica fetcher thread may truncate accepted messages during multiple > fast leadership transitions > - > > Key: KAFKA-7040 > URL: https://issues.apache.org/jira/browse/KAFKA-7040 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Wang >Priority: Minor > > Problem Statement: > Consider the scenario where there are two brokers, broker0, and broker1, and > there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as > the leader and broker0 as the follower. The following sequence of events > happened on broker0 > 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to > broker1, and awaits to get the response > 2. A LeaderAndISR request causes broker0 to become the leader for one > partition t1p0, which in turn will remove the partition t1p0 from the replica > fetcher thread > 3. Broker0 accepts some messages from a producer > 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and > broker0 to become the follower for partition t1p0. This will cause the > partition t1p0 to be added back to the replica fetcher thread on broker0. > 5. The replica fetcher thread on broker0 receives a response for the > LeaderEpoch request issued in step 1, and truncates the accepted messages in > step3. > The issue can be reproduced with the test from > https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea > [1] Initially we set up broker0 to be the follower of two partitions instead > of just one, to avoid the shutting down of the replica fetcher thread when it > becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions
Lucas Wang created KAFKA-7040: - Summary: The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions Key: KAFKA-7040 URL: https://issues.apache.org/jira/browse/KAFKA-7040 Project: Kafka Issue Type: Bug Reporter: Lucas Wang Problem Statement: Consider the scenario where there are two brokers, broker0, and broker1, and there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as the leader and broker0 as the follower. The following sequence of events happened on broker0 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to broker1, and awaits to get the response 2. A LeaderAndISR request causes broker0 to become the leader for one partition t1p0, which in turn will remove the partition t1p0 from the replica fetcher thread 3. Broker0 accepts some messages from a producer 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and broker0 to become the follower for partition t1p0. This will cause the partition t1p0 to be added back to the replica fetcher thread on broker0. 5. The replica fetcher thread on broker0 receives a response for the LeaderEpoch request issued in step 1, and truncates the accepted messages in step3. The issue can be reproduced with the test from https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea [1] Initially we set up broker0 to be the follower of two partitions instead of just one, to avoid the shutting down of the replica fetcher thread when it becomes idle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4453) add request prioritization
[ https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-4453: -- Description: Today all requests (client requests, broker requests, controller requests) to a broker are put into the same queue. They all have the same priority. So a backlog of requests ahead of the controller request will delay the processing of controller requests. This causes requests infront of the controller request to get processed based on stale state. Side effects may include giving clients stale metadata\[1\], rejecting ProduceRequests and FetchRequests\[2\], and data loss (for some unofficial\[3\] definition of data loss in terms of messages beyond the high watermark)\[4\]. We'd like to minimize the number of requests processed based on stale state. With request prioritization, controller requests get processed before regular queued up requests, so requests can get processed with up-to-date state. \[1\] Say a client's MetadataRequest is sitting infront of a controller's UpdateMetadataRequest on a given broker's request queue. Suppose the MetadataRequest is for a topic whose partitions have recently undergone leadership changes and that these leadership changes are being broadcasted from the controller in the later UpdateMetadataRequest. Today the broker processes the MetadataRequest before processing the UpdateMetadataRequest, meaning the metadata returned to the client will be stale. The client will waste a roundtrip sending requests to the stale partition leader, get a NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the topic metadata again. \[2\] Clients can issue ProduceRequests to the wrong broker based on stale metadata, causing rejected ProduceRequests. Based on how long the client acts based on the stale metadata, the impact may or may not be visible to a producer application. If the number of rejected ProduceRequests does not exceed the max number of retries, the producer application would not be impacted. On the other hand, if the retries are exhausted, the failed produce will be visible to the producer application. \[3\] The official definition of data loss in kafka is when we lose a "committed" message. A message is considered "committed" when all in sync replicas for that partition have applied it to their log. \[4\] Say a number of ProduceRequests are sitting infront of a controller's LeaderAndIsrRequest on a given broker's request queue. Suppose the ProduceRequests are for partitions whose leadership has recently shifted out from the current broker to another broker in the replica set. Today the broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning the ProduceRequests are getting processed on the former partition leader. As part of becoming a follower for a partition, the broker truncates the log to the high-watermark. With weaker ack settings such as acks=1, the leader may successfully write to its own log, respond to the user with a success, process the LeaderAndIsrRequest making the broker a follower of the partition, and truncate the log to a point before the user's produced messages. So users have a false sense that their produce attempt succeeded while in reality their messages got erased. While technically part of what they signed up for with acks=1, it can still come as a surprise. was: Today all requests (client requests, broker requests, controller requests) to a broker are put into the same queue. They all have the same priority. So a backlog of requests ahead of the controller request will delay the processing of controller requests. This causes requests infront of the controller request to get processed based on stale state. Side effects may include giving clients stale metadata\[1\], rejecting ProduceRequests and FetchRequests, and data loss (for some unofficial\[2\] definition of data loss in terms of messages beyond the high watermark)\[3\]. We'd like to minimize the number of requests processed based on stale state. With request prioritization, controller requests get processed before regular queued up requests, so requests can get processed with up-to-date state. \[1\] Say a client's MetadataRequest is sitting infront of a controller's UpdateMetadataRequest on a given broker's request queue. Suppose the MetadataRequest is for a topic whose partitions have recently undergone leadership changes and that these leadership changes are being broadcasted from the controller in the later UpdateMetadataRequest. Today the broker processes the MetadataRequest before processing the UpdateMetadataRequest, meaning the metadata returned to the client will be stale. The client will waste a roundtrip sending requests to the stale partition leader, get a NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the topic metadata again. \[2\] The official defini
[jira] [Created] (KAFKA-6974) Changes the interaction between request handler threads and fetcher threads into an ASYNC model
Lucas Wang created KAFKA-6974: - Summary: Changes the interaction between request handler threads and fetcher threads into an ASYNC model Key: KAFKA-6974 URL: https://issues.apache.org/jira/browse/KAFKA-6974 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang Problem Statement: At LinkedIn, occasionally our clients complain about receiving consant NotLeaderForPartition exceptions Investigations: For one investigated case, the cluster was going through a rolling bounce. And we saw there was a ~8 minutes delay between an old partition leader resigning and the new leader becoming active, based on entries of "Broker xxx handling LeaderAndIsr request" in the state change log. Our monitoring shows the LeaderAndISR request local time during the incident went up to ~4 minutes. Explanations: One possible explanation of the ~8 minutes of delay is: During controlled shutdown of a broker, the partitions whose leaders lie on the shutting down broker need to go through leadership transitions. And the controller process partitions in batches with each batch having config.controlledShutdownPartitionBatchSize partitions, e.g. 100. If the 1st LeaderAndISR sent to a new leader broker takes too long, e.g. 4 minutes, then the subsequent LeaderAndISR requests can have an accumulated delay of maybe 4 minutes, 8 minutes, or even 12 minutes... The reason is that subsequent LeaderAndISR requests are blocked in a muted channel, given only one LeaderAndISR request can be processed at a time with a maxInFlightRequestsPerConnection setting of 1. When that happens, no existing metric would show the total delay of 8 or 12 minutes for muted requests. Now the question is why it took ~4 minutes for the the 1st LeaderAndISR request to finish. Explanation for the ~4 minutes of local time for LeaderAndISR request: During processing of an LeaderAndISR request, the request handler thread needs to add partitions to or remove partitions from partitionStates field of the ReplicaFetcherThread, also shutdown idle fetcher threads by checking the size of the partitionStates field. On the other hand, background fetcher threads need to iterate through all the partitions in partitionStates in order to build fetch request, and process fetch responses. The synchronization between request handler thread and the fetcher threads is done through a partitionMapLock. Specifically, the fetcher threads may acquire the partitionMapLock, and then calls the following functions for processing the fetch response (1) processPartitionData, which in turn calls (2) Replica.maybeIncrementLogStartOffset, which calls (3) Log.maybeIncrementLogStartOffset, which calls (4) LeaderEpochCache.clearAndFlushEarliest. Now two factors contribute to the long holding of the partitionMapLock, 1. function (4) above entails calling sync() to make sure data gets persistent to the disk, which may potentially have a long latency 2. All the 4 functions above can potentially be called for each partition in the fetch response, multiplying the sync() latency by a factor of n. The end result is that the request handler thread got blocked for a long time trying to acquire the partitionMapLock of some fetcher inside AbstractFetcherManager.shutdownIdleFetcherThreads since checking each fetcher's partitionCount requires getting the partitionMapLock. In our testing environment, we reproduced the problem and confirmed the explanation above with a request handler thread getting blocked for 10 seconds trying to acquire the partitionMapLock of one particular fetcher thread, while there are many log entries showing "Incrementing log start offset of partition..." Proposed change: We propose to change the interaction between the request handler threads and the fetcher threads to an ASYNC model by using an event queue. All requests to add or remove partitions, or shutdown idle fetcher threads are modeled as items in the event queue. And only the fetcher threads can take items out of the event queue and actually process them. In the new ASYNC model, in order to be able to process an infinite sequence of FetchRequests, a fetcher thread initially has one FetchRequest, and after it's done processing one FetchRequest, it enqueues one more into its own event queue. Also since the current AbstractFetcherThread logic is inherited by both the replica-fetcher-threads and the consumer-fetcher-threads for the old consumer, and the latter has been deprecated, we plan to implement the ASYNC model with a clean-slate approach, and only support the replica-fetcher-threads, in order to make the code cleaner. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-4453) add request prioritization
[ https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang reassigned KAFKA-4453: - Assignee: Lucas Wang (was: Onur Karaman) > add request prioritization > -- > > Key: KAFKA-4453 > URL: https://issues.apache.org/jira/browse/KAFKA-4453 > Project: Kafka > Issue Type: Bug >Reporter: Onur Karaman >Assignee: Lucas Wang >Priority: Major > > Today all requests (client requests, broker requests, controller requests) to > a broker are put into the same queue. They all have the same priority. So a > backlog of requests ahead of the controller request will delay the processing > of controller requests. This causes requests infront of the controller > request to get processed based on stale state. > Side effects may include giving clients stale metadata\[1\], rejecting > ProduceRequests and FetchRequests, and data loss (for some unofficial\[2\] > definition of data loss in terms of messages beyond the high watermark)\[3\]. > We'd like to minimize the number of requests processed based on stale state. > With request prioritization, controller requests get processed before regular > queued up requests, so requests can get processed with up-to-date state. > \[1\] Say a client's MetadataRequest is sitting infront of a controller's > UpdateMetadataRequest on a given broker's request queue. Suppose the > MetadataRequest is for a topic whose partitions have recently undergone > leadership changes and that these leadership changes are being broadcasted > from the controller in the later UpdateMetadataRequest. Today the broker > processes the MetadataRequest before processing the UpdateMetadataRequest, > meaning the metadata returned to the client will be stale. The client will > waste a roundtrip sending requests to the stale partition leader, get a > NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the > topic metadata again. > \[2\] The official definition of data loss in kafka is when we lose a > "committed" message. A message is considered "committed" when all in sync > replicas for that partition have applied it to their log. > \[3\] Say a number of ProduceRequests are sitting infront of a controller's > LeaderAndIsrRequest on a given broker's request queue. Suppose the > ProduceRequests are for partitions whose leadership has recently shifted out > from the current broker to another broker in the replica set. Today the > broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning > the ProduceRequests are getting processed on the former partition leader. As > part of becoming a follower for a partition, the broker truncates the log to > the high-watermark. With weaker ack settings such as acks=1, the leader may > successfully write to its own log, respond to the user with a success, > process the LeaderAndIsrRequest making the broker a follower of the > partition, and truncate the log to a point before the user's produced > messages. So users have a false sense that their produce attempt succeeded > while in reality their messages got erased. While technically part of what > they signed up for with acks=1, it can still come as a surprise. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6753) Speed up event processing on the controller
Lucas Wang created KAFKA-6753: - Summary: Speed up event processing on the controller Key: KAFKA-6753 URL: https://issues.apache.org/jira/browse/KAFKA-6753 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang Assignee: Lucas Wang Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png The existing controller code updates metrics after processing every event. This can slow down event processing on the controller tremendously. In one profiling we see that updating metrics takes nearly 100% of the CPU for the controller event processing thread. Specifically the slowness can be attributed to two factors: 1. Each invocation to update the metrics is expensive. Specifically trying to calculate the offline partitions count requires iterating through all the partitions in the cluster to check if the partition is offline; and calculating the preferred replica imbalance count requires iterating through all the partitions in the cluster to check if a partition has a leader other than the preferred leader. In a large cluster, the number of partitions can be quite large, all seen by the controller. Even if the time spent to check a single partition is small, the accumulation effect of so many partitions in the cluster can make the invocation to update metrics quite expensive. One might argue that maybe the logic for processing each single partition is not optimized, we checked the CPU percentage of leaf nodes in the profiling result, and found that inside the loops of collection objects, e.g. the set of all partitions, no single function dominates the processing. Hence the large number of the partitions in a cluster is the main contributor to the slowness of one invocation to update the metrics. 2. The invocation to update metrics is called many times when the is a high number of events to be processed by the controller, one invocation after processing any event. The patch that will be submitted tries to fix bullet 2 above, i.e. reducing the number of invocations to update metrics. Instead of updating the metrics after processing any event, we only periodically check if the metrics needs to be updated, i.e. once every second. * If after the previous invocation to update metrics, there are other types of events that changed the controller’s state, then one second later the metrics will be updated. * If after the previous invocation, there has been no other types of events, then the call to update metrics can be bypassed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6753) Speed up event processing on the controller
[ https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6753: -- Attachment: Screen Shot 2018-04-04 at 7.08.55 PM.png > Speed up event processing on the controller > > > Key: KAFKA-6753 > URL: https://issues.apache.org/jira/browse/KAFKA-6753 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png > > > The existing controller code updates metrics after processing every event. > This can slow down event processing on the controller tremendously. In one > profiling we see that updating metrics takes nearly 100% of the CPU for the > controller event processing thread. Specifically the slowness can be > attributed to two factors: > 1. Each invocation to update the metrics is expensive. Specifically trying to > calculate the offline partitions count requires iterating through all the > partitions in the cluster to check if the partition is offline; and > calculating the preferred replica imbalance count requires iterating through > all the partitions in the cluster to check if a partition has a leader other > than the preferred leader. In a large cluster, the number of partitions can > be quite large, all seen by the controller. Even if the time spent to check a > single partition is small, the accumulation effect of so many partitions in > the cluster can make the invocation to update metrics quite expensive. One > might argue that maybe the logic for processing each single partition is not > optimized, we checked the CPU percentage of leaf nodes in the profiling > result, and found that inside the loops of collection objects, e.g. the set > of all partitions, no single function dominates the processing. Hence the > large number of the partitions in a cluster is the main contributor to the > slowness of one invocation to update the metrics. > 2. The invocation to update metrics is called many times when the is a high > number of events to be processed by the controller, one invocation after > processing any event. > The patch that will be submitted tries to fix bullet 2 above, i.e. reducing > the number of invocations to update metrics. Instead of updating the metrics > after processing any event, we only periodically check if the metrics needs > to be updated, i.e. once every second. > * If after the previous invocation to update metrics, there are other types > of events that changed the controller’s state, then one second later the > metrics will be updated. > * If after the previous invocation, there has been no other types of events, > then the call to update metrics can be bypassed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6652) The controller should log failed attempts to transition a replica to OfflineReplica state if there is no leadership info
[ https://issues.apache.org/jira/browse/KAFKA-6652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang resolved KAFKA-6652. --- Resolution: Won't Fix > The controller should log failed attempts to transition a replica to > OfflineReplica state if there is no leadership info > > > Key: KAFKA-6652 > URL: https://issues.apache.org/jira/browse/KAFKA-6652 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > > In certain conditions, the controller's attempt to transition a replica to > OfflineReplica state could fail because there is no leadership info, e.g. the > condition described in > [KAFKA-6650|https://issues.apache.org/jira/browse/KAFKA-6650]. When that > happens, there should be logs to indicate the failed state transitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6612) Added logic to prevent increasing partition counts during topic deletion
[ https://issues.apache.org/jira/browse/KAFKA-6612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang resolved KAFKA-6612. --- Resolution: Fixed > Added logic to prevent increasing partition counts during topic deletion > > > Key: KAFKA-6612 > URL: https://issues.apache.org/jira/browse/KAFKA-6612 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Major > > Problem: trying to increase the partition count of a topic while the topic > deletion is in progress can cause the topic to be never deleted. > In the current code base, if a topic deletion is still in progress and the > partition count is increased, > the new partition and its replica assignment be created on zookeeper as data > of the path /brokers/topics/. > Upon detecting the change, the controller sees the topic is being deleted, > and therefore ignores the partition change. Therefore the zk path > /brokers/topics//partitions/ will NOT be created. > If a controller switch happens next, the added partition will be detected by > the new controller and stored in the > controllerContext.partitionReplicaAssignment. The new controller then tries > to delete the topic by first transitioning its replicas to OfflineReplica. > However the transition to OfflineReplica state will NOT succeed since there > is no leader for the partition. Since the only state change path for a > replica to be successfully deleted is OfflineReplica -> > ReplicaDeletionStarted -> ReplicaDeletionSuccessful, not being able to enter > the OfflineReplica state means the replica can never be successfully deleted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6652) The controller should log failed attempts to transition a replica to OfflineReplica state if there is no leadership info
[ https://issues.apache.org/jira/browse/KAFKA-6652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6652: -- Description: In certain conditions, the controller's attempt to transition a replica to OfflineReplica state could fail because there is no leadership info, e.g. the condition described in [KAFKA-6650|https://issues.apache.org/jira/browse/KAFKA-6650]. When that happens, there should be logs to indicate the failed state transitions. (was: In certain conditions, the controller's attempt to transition a replica to OfflineReplica state could fail, e.g. the condition described in [KAFKA-6650|https://issues.apache.org/jira/browse/KAFKA-6650]. When that happens, there should be logs to indicate the failed state transitions.) > The controller should log failed attempts to transition a replica to > OfflineReplica state if there is no leadership info > > > Key: KAFKA-6652 > URL: https://issues.apache.org/jira/browse/KAFKA-6652 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > > In certain conditions, the controller's attempt to transition a replica to > OfflineReplica state could fail because there is no leadership info, e.g. the > condition described in > [KAFKA-6650|https://issues.apache.org/jira/browse/KAFKA-6650]. When that > happens, there should be logs to indicate the failed state transitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6652) The controller should log failed attempts to transition a replica to OfflineReplica state if there is no leadership info
[ https://issues.apache.org/jira/browse/KAFKA-6652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6652: -- Summary: The controller should log failed attempts to transition a replica to OfflineReplica state if there is no leadership info (was: The controller should log failed attempts to transition a replica to OfflineReplica state) > The controller should log failed attempts to transition a replica to > OfflineReplica state if there is no leadership info > > > Key: KAFKA-6652 > URL: https://issues.apache.org/jira/browse/KAFKA-6652 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > > In certain conditions, the controller's attempt to transition a replica to > OfflineReplica state could fail, e.g. the condition described in > [KAFKA-6650|https://issues.apache.org/jira/browse/KAFKA-6650]. When that > happens, there should be logs to indicate the failed state transitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6652) The controller should log failed attempts to transition a replica to OfflineReplica state
Lucas Wang created KAFKA-6652: - Summary: The controller should log failed attempts to transition a replica to OfflineReplica state Key: KAFKA-6652 URL: https://issues.apache.org/jira/browse/KAFKA-6652 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang Assignee: Lucas Wang In certain conditions, the controller's attempt to transition a replica to OfflineReplica state could fail, e.g. the condition described in [KAFKA-6650|https://issues.apache.org/jira/browse/KAFKA-6650]. When that happens, there should be logs to indicate the failed state transitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6650) The controller should be able to handle a partially deleted topic
Lucas Wang created KAFKA-6650: - Summary: The controller should be able to handle a partially deleted topic Key: KAFKA-6650 URL: https://issues.apache.org/jira/browse/KAFKA-6650 Project: Kafka Issue Type: Bug Reporter: Lucas Wang Assignee: Lucas Wang A previous controller could have deleted some partitions of a topic from ZK, but not all partitions, and then died. In that case, the new controller should be able to handle the partially deleted topic, and finish the deletion. In the current code base, if there is no leadership info for a replica's partition, the transition to OfflineReplica state for the replica will fail. Afterwards the transition to ReplicaDeletionStarted will fail as well since the only valid previous state for ReplicaDeletionStarted is OfflineReplica. Furthermore, it means the topic deletion will never finish. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6630) Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller
[ https://issues.apache.org/jira/browse/KAFKA-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6630: -- Summary: Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller (was: Speed up the processing of StopReplicaResponse events on the controller) > Speed up the processing of TopicDeletionStopReplicaResponseReceived events on > the controller > > > Key: KAFKA-6630 > URL: https://issues.apache.org/jira/browse/KAFKA-6630 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > > Problem Statement: > We find in a large cluster with many partition replicas, it takes a long time > to successfully delete a topic. > Root cause: > Further analysis shows that for a topic with N replicas, the controller > receives all the N StopReplicaResponses from brokers within a short time, > however sequentially handling all the N > TopicDeletionStopReplicaResponseReceived events one by one takes a long time. > Specifically the functions triggered while handling every single > TopicDeletionStopReplicaResponseReceived event include: > TopicDeletionStopReplicaResponseReceived.process calls > TopicDeletionManager.completeReplicaDeletion, which calls > TopicDeletionManager.resumeDeletions, which calls several inefficient > functions. > The inefficient functions called inside TopicDeletionManager.resumeDeletions > include > ReplicaStateMachine.areAllReplicasForTopicDeleted > ReplicaStateMachine.isAtLeastOneReplicaInDeletionStartedState > ReplicaStateMachine.replicasInState > Each of the 3 inefficient functions above will iterate through all the > replicas in the cluster, and filter out the replicas belonging to a topic. In > a large cluster with many replicas, these functions can be quite slow. > Total deletion time for a topic becomes long in single threaded controller > processing model: > Since the controller needs to sequentially process the queued > TopicDeletionStopReplicaResponseReceived events, if the time cost to process > one event is t, the total time to process all events for all replicas of a > topic is N * t. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6630) Speed up the processing of StopReplicaResponse events on the controller
Lucas Wang created KAFKA-6630: - Summary: Speed up the processing of StopReplicaResponse events on the controller Key: KAFKA-6630 URL: https://issues.apache.org/jira/browse/KAFKA-6630 Project: Kafka Issue Type: Improvement Components: core Reporter: Lucas Wang Assignee: Lucas Wang Problem Statement: We find in a large cluster with many partition replicas, it takes a long time to successfully delete a topic. Root cause: Further analysis shows that for a topic with N replicas, the controller receives all the N StopReplicaResponses from brokers within a short time, however sequentially handling all the N TopicDeletionStopReplicaResponseReceived events one by one takes a long time. Specifically the functions triggered while handling every single TopicDeletionStopReplicaResponseReceived event include: TopicDeletionStopReplicaResponseReceived.process calls TopicDeletionManager.completeReplicaDeletion, which calls TopicDeletionManager.resumeDeletions, which calls several inefficient functions. The inefficient functions called inside TopicDeletionManager.resumeDeletions include ReplicaStateMachine.areAllReplicasForTopicDeleted ReplicaStateMachine.isAtLeastOneReplicaInDeletionStartedState ReplicaStateMachine.replicasInState Each of the 3 inefficient functions above will iterate through all the replicas in the cluster, and filter out the replicas belonging to a topic. In a large cluster with many replicas, these functions can be quite slow. Total deletion time for a topic becomes long in single threaded controller processing model: Since the controller needs to sequentially process the queued TopicDeletionStopReplicaResponseReceived events, if the time cost to process one event is t, the total time to process all events for all replicas of a topic is N * t. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6612) Added logic to prevent increasing partition counts during topic deletion
[ https://issues.apache.org/jira/browse/KAFKA-6612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang reassigned KAFKA-6612: - Assignee: Lucas Wang > Added logic to prevent increasing partition counts during topic deletion > > > Key: KAFKA-6612 > URL: https://issues.apache.org/jira/browse/KAFKA-6612 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Major > > Problem: trying to increase the partition count of a topic while the topic > deletion is in progress can cause the topic to be never deleted. > In the current code base, if a topic deletion is still in progress and the > partition count is increased, > the new partition and its replica assignment be created on zookeeper as data > of the path /brokers/topics/. > Upon detecting the change, the controller sees the topic is being deleted, > and therefore ignores the partition change. Therefore the zk path > /brokers/topics//partitions/ will NOT be created. > If a controller switch happens next, the added partition will be detected by > the new controller and stored in the > controllerContext.partitionReplicaAssignment. The new controller then tries > to delete the topic by first transitioning its replicas to OfflineReplica. > However the transition to OfflineReplica state will NOT succeed since there > is no leader for the partition. Since the only state change path for a > replica to be successfully deleted is OfflineReplica -> > ReplicaDeletionStarted -> ReplicaDeletionSuccessful, not being able to enter > the OfflineReplica state means the replica can never be successfully deleted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6612) Added logic to prevent increasing partition counts during topic deletion
Lucas Wang created KAFKA-6612: - Summary: Added logic to prevent increasing partition counts during topic deletion Key: KAFKA-6612 URL: https://issues.apache.org/jira/browse/KAFKA-6612 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang Problem: trying to increase the partition count of a topic while the topic deletion is in progress can cause the topic to be never deleted. In the current code base, if a topic deletion is still in progress and the partition count is increased, the new partition and its replica assignment be created on zookeeper as data of the path /brokers/topics/. Upon detecting the change, the controller sees the topic is being deleted, and therefore ignores the partition change. Therefore the zk path /brokers/topics//partitions/ will NOT be created. If a controller switch happens next, the added partition will be detected by the new controller and stored in the controllerContext.partitionReplicaAssignment. The new controller then tries to delete the topic by first transitioning its replicas to OfflineReplica. However the transition to OfflineReplica state will NOT succeed since there is no leader for the partition. Since the only state change path for a replica to be successfully deleted is OfflineReplica -> ReplicaDeletionStarted -> ReplicaDeletionSuccessful, not being able to enter the OfflineReplica state means the replica can never be successfully deleted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
[ https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6481: -- Description: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers _inside a for-loop for each partition_ ReplicaStateMachine.doHandleStateChanges ReplicaStateMachine.handleStateChanges KafkaController.onReplicasBecomeOffline KafkaController.onBrokerFailure How to reproduce the problem: 1. Cretae a cluster with 2 brokers having id 1 and 2 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing brokers, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, and is ineligible for deletion. ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0 4._Verify that the following log message shows up 10 times in the controller.log file, one line for each partition in topic a0: "Leader not yet assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_ 5. Create another topic a1 also having 10 partitions, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` 6. Verify that the log message in step 4 appears *100 more* times (). This is because we have the following stack trace: addUpdateMetadataRequestForBrokers addLeaderAndIsrRequestForBrokers _inside a for-loop for each create response_ initializeLeaderAndIsrForPartitions In general, if n partitions have already been accumulated in the partitionsToBeDeleted variable, and a new topic is created with m partitions, m * n log messages above will be generated. 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica state on the controller. 8. Verify that the following log message in step 4 appears another *210* times. This is because a. During controlled shutdown, the function KafkaController.doControlledShutdown calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs above. b. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState. And this again generates 100 (10 x 10) logs above. c. At the bottom of the the function onReplicasBecomeOffline, it calls sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which generates 10 logs, one for each partition in the a0 topic. In general, when we have n partitions accumulated in the variable partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 * m * n + n logs could be generated. Performance improvement benchmark: if we perform the steps above with topic a0 having 5000 partitions, and topic a1 having 5000 partitions, when broker 2 goes down, it takes the controller ~4 minutes to go through controlled shutdown, detect the broker failure through zookeeper, and transition all replicas to OfflineReplica state. After applying the patch, the same process takes 23 seconds. The testing done: After applying the patch in this RB, I've verified that by going through the steps above, broker 2 going offline NO LONGER generates log entries for the a0 partitions. Also I've v
[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
[ https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6481: -- Description: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers _inside a for-loop for each partition_ ReplicaStateMachine.doHandleStateChanges ReplicaStateMachine.handleStateChanges KafkaController.onReplicasBecomeOffline KafkaController.onBrokerFailure How to reproduce the problem: 1. Cretae a cluster with 2 brokers having id 1 and 2 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing brokers, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, and is ineligible for deletion. ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0 4._Verify that the following log message shows up 10 times in the controller.log file, one line for each partition in topic a0: "Leader not yet assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_ 5. Create another topic a1 also having 10 partitions, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` 6. Verify that the log message in step 4 appears *100 more* times (). This is because we have the following stack trace: addUpdateMetadataRequestForBrokers addLeaderAndIsrRequestForBrokers _inside a for-loop for each create response_ initializeLeaderAndIsrForPartitions In general, if n partitions have already been accumulated in the partitionsToBeDeleted variable, and a new topic is created with m partitions, m * n log messages above will be generated. 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica state on the controller. 8. Verify that the following log message in step 4 appears another *210* times. This is because a. During controlled shutdown, the function KafkaController.doControlledShutdown calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs above. b. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState. And this again generates 100 (10 x 10) logs above. c. At the bottom of the the function onReplicasBecomeOffline, it calls sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which generates 10 logs, one for each partition in the a0 topic. In general, when we have n partitions accumulated in the variable partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 * m * n + n logs could be generated. Performance improvement benchmark: if we perform the steps above with topic a0 having 5000 partitions, and topic a1 having 5000 partitions, when broker 2 goes down, it takes the controller ~4 minutes for to controller to go through controlled shutdown, detect the broker failure through zookeeper, and transition all replicas to OfflineReplica state. After applying the patch, the same process takes 23 seconds. The testing done: After applying the patch in this RB, I've verified that by going through the steps above, broker 2 going offline NO LONGER generates log entries for the a0 parti
[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
[ https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6481: -- Description: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers _inside a for-loop for each partition_ ReplicaStateMachine.doHandleStateChanges ReplicaStateMachine.handleStateChanges KafkaController.onReplicasBecomeOffline KafkaController.onBrokerFailure How to reproduce the problem: 1. Cretae a cluster with 2 brokers having id 1 and 2 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing brokers, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, and is ineligible for deletion. ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0 4._Verify that the following log message shows up 10 times in the controller.log file, one line for each partition in topic a0: "Leader not yet assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_ 5. Create another topic a1 also having 10 partitions, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` 6. Verify that the log message in step 4 appears *100 more* times (). This is because we have the following stack trace: addUpdateMetadataRequestForBrokers addLeaderAndIsrRequestForBrokers _inside a for-loop for each create response_ initializeLeaderAndIsrForPartitions In general, if n partitions have already been accumulated in the partitionsToBeDeleted variable, and a new topic is created with m partitions, m * n log messages above will be generated. 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica state on the controller. 8. Verify that the following log message in step 4 appears another *210* times. This is because a. During controlled shutdown, the function KafkaController.doControlledShutdown calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs above. b. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState. And this again generates 100 (10 x 10) logs above. c. At the bottom of the the function onReplicasBecomeOffline, it calls sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which generates 10 logs, one for each partition in the a0 topic. In general, when we have n partitions accumulated in the variable partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 * m * n + n logs could be generated. Performance improvement benchmark: if we perform the steps above with topic a0 having 5000 partitions, and topic a1 having 5000 partitions, when broker 2 goes down, it takes the controller ~4 minutes for to controller to go through controlled shutdown, detect the broker failure through zookeeper, and transition all replicas to OfflineReplica state. After applying the patch, the same process takes 23 seconds. After applying the patch in this RB, I've verified that by going through the steps above, broker 2 going offline NO LONGER generates log entries for the a0 partitions. Also I've
[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
[ https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6481: -- Description: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers _inside a for-loop for each partition_ ReplicaStateMachine.doHandleStateChanges ReplicaStateMachine.handleStateChanges KafkaController.onReplicasBecomeOffline KafkaController.onBrokerFailure How to reproduce the problem: 1. Cretae a cluster with 2 brokers having id 1 and 2 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing brokers, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, and is ineligible for deletion. ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0 4._Verify that the following log message shows up 10 times in the controller.log file, one line for each partition in topic a0: "Leader not yet assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_ 5. Create another topic a1 also having 10 partitions, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` 6. Verify that the log message in step 4 appears *100 more* times (). This is because we have the following stack trace: addUpdateMetadataRequestForBrokers addLeaderAndIsrRequestForBrokers _inside a for-loop for each create response_ initializeLeaderAndIsrForPartitions In general, if n partitions have already been accumulated in the partitionsToBeDeleted variable, and a new topic is created with m partitions, m * n log messages above will be generated. 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica state on the controller. 8. Verify that the following log message in step 4 appears another *210* times. This is because a. During controlled shutdown, the function KafkaController.doControlledShutdown calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs above. b. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState. And this again generates 100 (10 x 10) logs above. c. At the bottom of the the function onReplicasBecomeOffline, it calls sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which generates 10 logs, one for each partition in the a0 topic. In general, when we have n partitions accumulated in the variable partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 * m * n + n logs could be generated. After applying the patch in this RB, I've verified that by going through the steps above, broker 2 going offline NO LONGER generates log entries for the a0 partitions. Also I've verified that topic deletion for topic a1 still works fine. was: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to
[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
[ https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6481: -- Description: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers _inside a for-loop for each partition_ ReplicaStateMachine.doHandleStateChanges ReplicaStateMachine.handleStateChanges KafkaController.onReplicasBecomeOffline KafkaController.onBrokerFailure How to reproduce the problem: 1. Cretae a cluster with 2 brokers having id 1 and 2 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing brokers, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, and is ineligible for deletion. ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0 4._Verify that the following log message shows up 10 times in the controller.log file, one line for each partition in topic a0: "Leader not yet assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_ 5. Create another topic a1 also having 10 partitions, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` 6. Verify that the log message in step 4 appears *100 more* times (). This is because we have the following stack trace: addUpdateMetadataRequestForBrokers addLeaderAndIsrRequestForBrokers _inside a for-loop for each create response_ initializeLeaderAndIsrForPartitions In general, if n partitions have already been accumulated in the partitionsToBeDeleted variable, and a new topic is created with m partitions, m * n log messages above will be generated. 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica state on the controller. 8. Verify that the following log message in step 4 appears another *210* times. This is because a. During controlled shutdown, the function KafkaController.doControlledShutdown calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs above. b. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState. And this again generates 100 (10 x 10) logs above. c. At the bottom of the the function onReplicasBecomeOffline, it calls sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which generates 10 logs, one for each partition in the a0 topic. In general, when we have n partitions accumulated in the variable partitionsToBeDeleted, and a broker with m partitions becomes offline, 2 * m * n + n logs will be generated. After applying the patch in this RB, I've verified that by going through the steps above, broker 2 going offline NO LONGER generates log entries for the a0 partitions. Also I've verified that topic deletion for topic a1 still works fine. was: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-del
[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
[ https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6481: -- Description: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers _inside a for-loop for each partition_ ReplicaStateMachine.doHandleStateChanges ReplicaStateMachine.handleStateChanges KafkaController.onReplicasBecomeOffline KafkaController.onBrokerFailure How to reproduce the problem: 1. Cretae a cluster with 2 brokers having id 1 and 2 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing brokers, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, and is ineligible for deletion. ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0 4._Verify that the following log message shows up 10 times in the controller.log file, one line for each partition in topic a0: "Leader not yet assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_ 5. Create another topic a1 also having 10 partitions, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` 6. Verify that the log message in step 4 appears *100 more* times (). This is because we have the following stack trace: addUpdateMetadataRequestForBrokers addLeaderAndIsrRequestForBrokers _inside a for-loop for each create response_ initializeLeaderAndIsrForPartitions 5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica state on the controller. 6. Verify that the following log message in step 4 appears another *210* times. This is because a. During controlled shutdown, the function KafkaController.doControlledShutdown calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs above. b. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState. And this again generates 100 (10 x 10) logs above. c. At the bottom of the the function onReplicasBecomeOffline, it calls sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which generates 10 logs, one for each partition in the a0 topic. According to the analysis in step 6, when we have n partitions accumulated in the variable partitionsToBeDeleted, and a broker with m partitions becomes offline, 2 * m * n + n logs in step 4 will be generated. After applying the patch in this RB, I've verified that by going through the steps above, broker 2 going offline NO LONGER generates log entries for the a0 partitions. Also I've verified that topic deletion for topic a1 still works fine. was: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead
[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
[ https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang updated KAFKA-6481: -- Description: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers inside a for-loop for each partition ReplicaStateMachine.doHandleStateChanges ReplicaStateMachine.handleStateChanges KafkaController.onReplicasBecomeOffline KafkaController.onBrokerFailure How to reproduce the problem: 1. Cretae a cluster with 2 brokers having id 1 and 2 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing brokers, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, and is ineligible for deletion. ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0 4. Create another topic a1 also having 10 partitions, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` 5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica state on the controller. 6. Verify that the following log message appear over 200 times in the controller.log file, one for each iteration of the a0 partitions "Leader not yet assigned for partition [a0,..]. Skip sending UpdateMetadataRequest." What happened was 1. During controlled shutdown, the function KafkaController.doControlledShutdown calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs above. 2. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState. And this again generates 100 (10 x 10) entries of the logs above. After applying the patch in this RB, I've verified that by going through the steps above, broker 2 going offline NO LONGER generates log entries for the a0 partitions. Also I've verified that topic deletion for topic a1 still works fine. was: The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers C
[jira] [Assigned] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
[ https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Wang reassigned KAFKA-6481: - Assignee: Lucas Wang > Improving performance of the function > ControllerChannelManager.addUpdateMetadataRequestForBrokers > - > > Key: KAFKA-6481 > URL: https://issues.apache.org/jira/browse/KAFKA-6481 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Minor > > The function ControllerChannelManager.addUpdateMetadataRequestForBrokers > should only process the partitions specified in the partitions parameter, > i.e. the 2nd parameter, and avoid iterating through the set of partitions in > TopicDeletionManager.partitionsToBeDeleted. > > Here is why the current code can be a problem: > The number of partitions-to-be-deleted stored in the field > TopicDeletionManager.partitionsToBeDeleted can become quite large under > certain scenarios. For instance, if a topic a0 has dead replicas, the topic > a0 would be marked as ineligible for deletion, and its partitions will be > retained in the field TopicDeletionManager.partitionsToBeDeleted for future > retries. > With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, > if some replicas in another topic a1 needs to be transitioned to > OfflineReplica state, possibly because of a broker going offline, a call > stack listed as following will happen on the controller, causing a iteration > of the whole partitions-to-be-deleted set for every single affected partition. > controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition > => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) > ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers > ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers > inside a for-loop for each partition > ReplicaStateMachine.doHandleStateChanges > ReplicaStateMachine.handleStateChanges > KafkaController.onReplicasBecomeOffline > KafkaController.onBrokerFailure > How to reproduce the problem: > 1. Cretae a cluster with 2 brokers having id 1 and 2 > 2. Create a topic having 10 partitions and deliberately assign the replicas > to non-existing brokers, i.e. > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 > --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` > 3. Delete the topic and cause all of its partitions to be retained in the > field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead > replicas, and is ineligible for deletion. > 4. Create another topic a1 also having 10 partitions, i.e. > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 > --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` > 5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to > OfflineReplica state on the controller. > 6. Verify that the following log message appear over 200 times in the > controller.log file, one for each iteration of the a0 partitions > "Leader not yet assigned for partition [a0,..]. Skip sending > UpdateMetadataRequest." > > What happened was > 1. During controlled shutdown, the function > KafkaController.doControlledShutdown calls > replicaStateMachine.handleStateChanges to transition all the replicas on > broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the > logs above. > 2. When the broker zNode is gone in ZK, the function > KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline > to transition all the replicas on broker 2 to OfflineState. And this again > generates 100 (10 x 10) entries of the logs above. > After applying the patch in this RB, I've verified that by going through the > steps above, broker 2 going offline NO LONGER generates log entries for the > a0 partitions. > Also I've verified that topic deletion for topic a1 still works fine. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
Lucas Wang created KAFKA-6481: - Summary: Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers Key: KAFKA-6481 URL: https://issues.apache.org/jira/browse/KAFKA-6481 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted. Here is why the current code can be a problem: The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be marked as ineligible for deletion, and its partitions will be retained in the field TopicDeletionManager.partitionsToBeDeleted for future retries. With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of a broker going offline, a call stack listed as following will happen on the controller, causing a iteration of the whole partitions-to-be-deleted set for every single affected partition. controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true)) ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers inside a for-loop for each partition ReplicaStateMachine.doHandleStateChanges ReplicaStateMachine.handleStateChanges KafkaController.onReplicasBecomeOffline KafkaController.onBrokerFailure How to reproduce the problem: 1. Cretae a cluster with 2 brokers having id 1 and 2 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing brokers, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done` 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, and is ineligible for deletion. 4. Create another topic a1 also having 10 partitions, i.e. ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done` 5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica state on the controller. 6. Verify that the following log message appear over 200 times in the controller.log file, one for each iteration of the a0 partitions "Leader not yet assigned for partition [a0,..]. Skip sending UpdateMetadataRequest." What happened was 1. During controlled shutdown, the function KafkaController.doControlledShutdown calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs above. 2. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState. And this again generates 100 (10 x 10) entries of the logs above. After applying the patch in this RB, I've verified that by going through the steps above, broker 2 going offline NO LONGER generates log entries for the a0 partitions. Also I've verified that topic deletion for topic a1 still works fine. -- This message was sent by Atlassian JIRA (v7.6.3#76005)