[jira] [Created] (KAFKA-14424) Cancellation of an ongoing replica reassignment should have sanity checks

2022-11-29 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-14424:
--

 Summary: Cancellation of an ongoing replica reassignment should 
have sanity checks
 Key: KAFKA-14424
 URL: https://issues.apache.org/jira/browse/KAFKA-14424
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


When reassigning replicas, Kafka runs a sanity check to ensure all of the 
target replicas are alive before allowing the reassignment request to proceed.
However, for an AlterPartitionReassignments request that cancels an ongoing 
reassignment, there is no such check.
The result is that if the original replicas are offline, the cancellation may 
result in partitions
without any leaders. This problem has been observed in our clusters.

 

There should be some sanity check to ensure the cancellation would also land 
the partitions in valid states, e.g. by ensuring all of the original replicas 
are all alive.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14381) Support listing all partitions being reassigned in a cluster

2022-11-10 Thread Lucas Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang reassigned KAFKA-14381:
--

Assignee: Lucas Wang

> Support listing all partitions being reassigned in a cluster
> 
>
> Key: KAFKA-14381
> URL: https://issues.apache.org/jira/browse/KAFKA-14381
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
>
> The current implementation of kafka-topics.sh doesn't support listing all of 
> the partitions that are being reassigned within a cluster.
> Showing such info can be really useful during troubleshooting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14381) Support listing all partitions being reassigned in a cluster

2022-11-10 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-14381:
--

 Summary: Support listing all partitions being reassigned in a 
cluster
 Key: KAFKA-14381
 URL: https://issues.apache.org/jira/browse/KAFKA-14381
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


The current implementation of kafka-topics.sh doesn't support listing all of 
the partitions that are being reassigned within a cluster.

Showing such info can be really useful during troubleshooting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14213) Reduce lock contention between control-plane-kafka-request-handler and kafka-log-cleaner-thread

2022-09-09 Thread Lucas Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-14213:
---
Description: 
We found that the StopReplica request's local processing time may be quite long 
due to the reasons explained below. The impact of a time-consuming StopReplica 
request is that all subsequent requests from the controller are blocked, 
causing slow convergence on the metadata plane.

 

The long local processing time is because the 
control-plane-kafka-request-handler thread is blocked on a lock to abort 
logCleaning with the following stack trace:

 
{code:java}
"control-plane-kafka-request-handler-0"
   java.lang.Thread.State: WAITING
at java.base@11.0.13/jdk.internal.misc.Unsafe.park(Native Method)
at 
java.base@11.0.13/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917)
at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240)
at 
java.base@11.0.13/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267)
at 
kafka.log.LogCleanerManager.abortCleaning(LogCleanerManager.scala:266)
at kafka.log.LogCleaner.abortCleaning(LogCleaner.scala:205)
at kafka.log.LogManager.asyncDelete(LogManager.scala:1039) {code}
 

 

In the mean time, the kafka-log-cleaner-thread is holding the lock and busy 
figuring out the filthiest compacted log, which can be a very time consuming 
task. Our periodic thread dumps captured many snapshots with the 
kafka-log-cleaner-thread in either of the following 2 stack traces:

 

 
{code:java}
"kafka-log-cleaner-thread-0"
   java.lang.Thread.State: RUNNABLE
at kafka.log.TimeIndex.(TimeIndex.scala:57)
at kafka.log.LazyIndex$.$anonfun$forTime$1(LazyIndex.scala:109)
at kafka.log.LazyIndex$$$Lambda$1871/0x000800f4d840.apply(Unknown 
Source)
at kafka.log.LazyIndex.$anonfun$get$1(LazyIndex.scala:63)
at kafka.log.LazyIndex.get(LazyIndex.scala:60)
at kafka.log.LogSegment.timeIndex(LogSegment.scala:66)
at kafka.log.LogSegment.maxTimestampAndOffsetSoFar(LogSegment.scala:107)
at kafka.log.LogSegment.maxTimestampSoFar(LogSegment.scala:113)
at kafka.log.LogSegment.largestTimestamp(LogSegment.scala:640)
at 
kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4(LogCleanerManager.scala:617)
at 
kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4$adapted(LogCleanerManager.scala:616)
at 
kafka.log.LogCleanerManager$$$Lambda$2215/0x00080104d040.apply(Unknown 
Source)
at scala.collection.Iterator.find(Iterator.scala:993)
at scala.collection.Iterator.find$(Iterator.scala:990)
at scala.collection.AbstractIterator.find(Iterator.scala:1429)
at scala.collection.IterableLike.find(IterableLike.scala:81)
at scala.collection.IterableLike.find$(IterableLike.scala:80)
at scala.collection.AbstractIterable.find(Iterable.scala:56)
at 
kafka.log.LogCleanerManager$.cleanableOffsets(LogCleanerManager.scala:616)
at 
kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:186)
at 
kafka.log.LogCleanerManager$$Lambda$2212/0x00080104f040.apply(Unknown 
Source)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at 
scala.collection.TraversableLike$$Lambda$891/0x000800a89840.apply(Unknown 
Source)
at scala.collection.immutable.List.foreach(List.scala:392) {code}
 
{code:java}
"kafka-log-cleaner-thread-0"
   java.lang.Thread.State: RUNNABLE
at java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread0(Native Method)
at 
java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread(FileDispatcherImpl.java:54)
at 
java.base@11.0.13/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:274)
at java.base@11.0.13/sun.nio.ch.IOUtil.read(IOUtil.java:245)
at 
java.base@11.0.13/sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:811)
at 
java.base@11.0.13/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:796)
at org.apache.kafka.common.utils.Utils.readFully(Utils.java:1114)
at org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:1087)
at 
org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:69)
at 
org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:42)
at 
org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35)
at 
org.apache.kafka.common.record.Re

[jira] [Created] (KAFKA-14213) Reduce lock contention between control-plane-kafka-request-handler and kafka-log-cleaner-thread

2022-09-09 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-14213:
--

 Summary: Reduce lock contention between 
control-plane-kafka-request-handler and kafka-log-cleaner-thread
 Key: KAFKA-14213
 URL: https://issues.apache.org/jira/browse/KAFKA-14213
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


We found that the StopReplica request's local processing time may be quite long 
due to the reasons explained below. The impact of a time-consuming StopReplica 
request is that all subsequent requests from the controller are blocked, 
causing slow convergence on the metadata plane.

 

The long local processing time is because the 
control-plane-kafka-request-handler thread is blocked on a lock to abort 
logCleaning with the following stack trace:

 
{code:java}
"control-plane-kafka-request-handler-0"
java.lang.Thread.State: WAITING at 
java.base@11.0.13/jdk.internal.misc.Unsafe.park(Native Method) at 
java.base@11.0.13/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
 at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
 at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:917)
 at 
java.base@11.0.13/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1240)
 at 
java.base@11.0.13/java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:267)
 at kafka.log.LogCleanerManager.abortCleaning(LogCleanerManager.scala:266) at 
kafka.log.LogCleaner.abortCleaning(LogCleaner.scala:205) at 
kafka.log.LogManager.asyncDelete(LogManager.scala:1039)
{code}
 

 

In the mean time, the kafka-log-cleaner-thread is holding the lock and busy 
figuring out the filthiest compacted log, which can be a very time consuming 
task. Our periodic thread dumps captured many snapshots with the 
kafka-log-cleaner-thread in either of the following 2 stack traces:

 

 
{code:java}
"kafka-log-cleaner-thread-0"
java.lang.Thread.State: RUNNABLE at 
kafka.log.TimeIndex.(TimeIndex.scala:57) at 
kafka.log.LazyIndex$.$anonfun$forTime$1(LazyIndex.scala:109) at 
kafka.log.LazyIndex$$$Lambda$1871/0x000800f4d840.apply(Unknown Source) at 
kafka.log.LazyIndex.$anonfun$get$1(LazyIndex.scala:63) at 
kafka.log.LazyIndex.get(LazyIndex.scala:60) at 
kafka.log.LogSegment.timeIndex(LogSegment.scala:66) at 
kafka.log.LogSegment.maxTimestampAndOffsetSoFar(LogSegment.scala:107) at 
kafka.log.LogSegment.maxTimestampSoFar(LogSegment.scala:113) at 
kafka.log.LogSegment.largestTimestamp(LogSegment.scala:640) at 
kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4(LogCleanerManager.scala:617)
 at 
kafka.log.LogCleanerManager$.$anonfun$cleanableOffsets$4$adapted(LogCleanerManager.scala:616)
 at kafka.log.LogCleanerManager$$$Lambda$2215/0x00080104d040.apply(Unknown 
Source) at scala.collection.Iterator.find(Iterator.scala:993) at 
scala.collection.Iterator.find$(Iterator.scala:990) at 
scala.collection.AbstractIterator.find(Iterator.scala:1429) at 
scala.collection.IterableLike.find(IterableLike.scala:81) at 
scala.collection.IterableLike.find$(IterableLike.scala:80) at 
scala.collection.AbstractIterable.find(Iterable.scala:56) at 
kafka.log.LogCleanerManager$.cleanableOffsets(LogCleanerManager.scala:616) at 
kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$4(LogCleanerManager.scala:186)
 at kafka.log.LogCleanerManager$$Lambda$2212/0x00080104f040.apply(Unknown 
Source) at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at 
scala.collection.TraversableLike$$Lambda$891/0x000800a89840.apply(Unknown 
Source) at scala.collection.immutable.List.foreach(List.scala:392)
{code}
 

 

 
{code:java}
"kafka-log-cleaner-thread-0" java.lang.Thread.State: RUNNABLE at 
java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread0(Native Method) at 
java.base@11.0.13/sun.nio.ch.FileDispatcherImpl.pread(FileDispatcherImpl.java:54)
 at java.base@11.0.13/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:274) 
at java.base@11.0.13/sun.nio.ch.IOUtil.read(IOUtil.java:245) at 
java.base@11.0.13/sun.nio.ch.FileChannelImpl.readInternal(FileChannelImpl.java:811)
 at java.base@11.0.13/sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:796) 
at org.apache.kafka.common.utils.Utils.readFully(Utils.java:1114) at 
org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:1087) at 
org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:69)
 at 
org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:42)
 at 
org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35)
 at 
org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24)
 at 
org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
 at 
o

[jira] [Assigned] (KAFKA-13815) Avoid reinitialization for a replica that is being deleted

2022-04-11 Thread Lucas Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang reassigned KAFKA-13815:
--

Assignee: Lucas Wang

> Avoid reinitialization for a replica that is being deleted
> --
>
> Key: KAFKA-13815
> URL: https://issues.apache.org/jira/browse/KAFKA-13815
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Major
>
> https://issues.apache.org/jira/browse/KAFKA-10002
> identified that deletion of replicas can be slow when a StopReplica request 
> is being
> processed, and has implemented a change to improve the efficiency.
> We found that the efficiency can be further improved by avoiding the 
> reinitialization of the
> leader epoch cache and partition metadata for a replica that needs to be 
> deleted.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13815) Avoid reinitialization for a replica that is being deleted

2022-04-11 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-13815:
--

 Summary: Avoid reinitialization for a replica that is being deleted
 Key: KAFKA-13815
 URL: https://issues.apache.org/jira/browse/KAFKA-13815
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


https://issues.apache.org/jira/browse/KAFKA-10002

identified that deletion of replicas can be slow when a StopReplica request is 
being
processed, and has implemented a change to improve the efficiency.
We found that the efficiency can be further improved by avoiding the 
reinitialization of the
leader epoch cache and partition metadata for a replica that needs to be 
deleted.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13797) Adding metric to indicate metadata response outgoing bytes rate

2022-04-05 Thread Lucas Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13797?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-13797:
---
Description: 
It's not a common case, but we experienced the following problem in one of our 
clusters.

The use case involves dynamically creating and deleting topics in the cluster, 
and the clients were constantly using the special type of Metadata requests 
whose topics field is null in order to retrieve all topics before checking a 
topic's existence.

A high rate of such Metadata requests generated a heavy load on brokers in the 
cluster. 

Yet, currently, there is no metric to indicate the metadata response outgoing 
bytes rate.

We propose to add such a metric in order to make the troubleshooting of such 
cases easier.

  was:
It's not a common case, but we experienced the following problem in one of our 
clusters.

The use case involves dynamically creating and deleting topics in the cluster, 
and the clients were constantly checking if a topic exists in a cluster using 
the special type of Metadata requests whose topics field is null in order to 
retrieve all topics before checking a topic's existence.

A high rate of such Metadata requests generated a heavy load on brokers in the 
cluster. 

Yet, currently, there is no metric to indicate the metadata response outgoing 
bytes rate.

We propose to add such a metric in order to make the troubleshooting of such 
cases easier.


> Adding metric to indicate metadata response outgoing bytes rate
> ---
>
> Key: KAFKA-13797
> URL: https://issues.apache.org/jira/browse/KAFKA-13797
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Priority: Minor
>
> It's not a common case, but we experienced the following problem in one of 
> our clusters.
> The use case involves dynamically creating and deleting topics in the 
> cluster, and the clients were constantly using the special type of Metadata 
> requests whose topics field is null in order to retrieve all topics before 
> checking a topic's existence.
> A high rate of such Metadata requests generated a heavy load on brokers in 
> the cluster. 
> Yet, currently, there is no metric to indicate the metadata response outgoing 
> bytes rate.
> We propose to add such a metric in order to make the troubleshooting of such 
> cases easier.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13797) Adding metric to indicate metadata response outgoing bytes rate

2022-04-04 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-13797:
--

 Summary: Adding metric to indicate metadata response outgoing 
bytes rate
 Key: KAFKA-13797
 URL: https://issues.apache.org/jira/browse/KAFKA-13797
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


It's not a common case, but we experienced the following problem in one of our 
clusters.

The use case involves dynamically creating and deleting topics in the cluster, 
and the clients were constantly checking if a topic exists in a cluster using 
the special type of Metadata requests whose topics field is null in order to 
retrieve all topics before checking a topic's existence.

A high rate of such Metadata requests generated a heavy load on brokers in the 
cluster. 

Yet, currently, there is no metric to indicate the metadata response outgoing 
bytes rate.

We propose to add such a metric in order to make the troubleshooting of such 
cases easier.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13188) Release the memory back into MemoryPool

2021-08-10 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-13188:
--

 Summary: Release the memory back into MemoryPool
 Key: KAFKA-13188
 URL: https://issues.apache.org/jira/browse/KAFKA-13188
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


Tushar made a [hotfix change|https://github.com/linkedin/kafka/pull/186] to the 
linkedin/kafka repo hosting apache kafka 2.4.

The change is about releasing memory back to the MemoryPool for the kafka 
consumer, and his benchmark showed significant improvement in terms of the 
memory graduating from Young Gen and promoted to Old Gen.

Given the benefit, the change should also be added trunk.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10548) Implement deletion logic for LeaderAndIsrRequests

2021-07-27 Thread Lucas Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388367#comment-17388367
 ] 

Lucas Wang commented on KAFKA-10548:


Hi [~jolshan] thanks for the KIP-516 and for keeping track of the tasks. At 
LinkedIn, we are very interested in adopting the better topic deletions.

Have you started/planned to work on this ticket? If not, do you mind if I take 
it over?

> Implement deletion logic for LeaderAndIsrRequests
> -
>
> Key: KAFKA-10548
> URL: https://issues.apache.org/jira/browse/KAFKA-10548
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Priority: Major
>
> This will allow for specialized deletion logic when receiving 
> LeaderAndIsrRequests
> Will also create and utilize delete.stale.topic.delay.ms configuration option



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12315) Clearing the ZkReplicaStateMachine request batch state upon ControllerMovedException

2021-02-08 Thread Lucas Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-12315:
---
Description: 
As shown in the attached sequence diagram, during topic deletion the following 
sequence of events can happen
 1. The ZkReplicaStateMachine calls 
AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and
 adds some entries to its stopReplicaRequestMap

2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr

3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a 
ControllerMovedException may be thrown
 due to zkVersion check failure

4. The ControllerMovedException is captured by the ZkPartitionStateMachine and 
an error such as the following is created:
{code:java}
2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Controller moved to another broker when moving some 
replicas to OfflineReplica state
 org.apache.kafka.common.errors.ControllerMovedException: Controller epoch 
zkVersion check fails. Expected zkVersion = 139{code}
5. The ControllerMovedException is rethrown and captured by the 
KafkaController, which will resign

At this point, the controller has resigned, however the stopReplicaRequestMap 
state populated in step 1 hasn't been cleared.

Later on, when the controller wins an election and becomes the active 
controller again, an IllegalStateException will be triggered due to the left 
over state:

 
{code:java}
2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Error while moving some replicas to OnlineReplica state
 java.lang.IllegalStateException: Controller to broker state change requests 
batch is not empty while creating a new one. Some StopReplica state changes 
Map(6121 -> ListB\
 uffer(StopRepl\
 icaRequestInfo([Topic=,Partition=2,Replica=6121],false))) might be lost
 at 
kafka.controller.AbstractControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:383)
 ~[kafka_2.12-2.4.1.10.jar:?]
 at 
kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:109)
 ~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:40) 
~[kafka_2.12-2.4.1.10.jar:?]
 at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:365)
 ~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.KafkaController.elect(KafkaController.scala:1484) 
~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.KafkaController.processReelect(KafkaController.scala:1972) 
~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.KafkaController.process(KafkaController.scala:2065) 
~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53) 
~[kafka_2.12-2.4.1.10.jar:?]
 at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137)
 ~[kafka_2.12-2.4.1.10.jar:?]
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
[scala-library-2.12.10.jar:?]
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) 
[kafka_2.12-2.4.1.10.jar:?]
 at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137)
 [kafka_2.12-2.4.1.10.jar:?]
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) 
[kafka_2.12-2.4.1.10.jar:?]{code}
Essentially, the controller is not able to transition some replicas to 
OnlineReplica state, and it cannot send any requests to any brokers via the 
ReplicaStateMachine.

  was:
As shown in the attached sequence diagram, during topic deletion the following 
sequence of events can happen
 1. The ZkReplicaStateMachine calls 
AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and
 adds some entries to its stopReplicaRequestMap

2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr

3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a 
ControllerMovedException may be thrown
 due to zkVersion check failure

4. The ControllerMovedException is captured by the ZkPartitionStateMachine and 
an error such as the following is created:

2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Controller moved to another broker when moving some 
replicas to OfflineReplica state
 org.apache.kafka.common.errors.ControllerMovedException: Controller epoch 
zkVersion check fails. Expected zkVersion = 139

5. The ControllerMovedException is rethrown and captured by the 
KafkaController, which will resign

At this point, the controller has resigned, however the stopReplicaRequestMap 
state populated in step 1 hasn't been cleared.

Later on, when the controller wins an election and becomes the active 
controller again, an IllegalStateException will be triggered due to the left 
over state:

 
{code:java}
2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMa

[jira] [Updated] (KAFKA-12315) Clearing the ZkReplicaStateMachine request batch state upon ControllerMovedException

2021-02-08 Thread Lucas Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12315?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-12315:
---
Description: 
As shown in the attached sequence diagram, during topic deletion the following 
sequence of events can happen
 1. The ZkReplicaStateMachine calls 
AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and
 adds some entries to its stopReplicaRequestMap

2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr

3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a 
ControllerMovedException may be thrown
 due to zkVersion check failure

4. The ControllerMovedException is captured by the ZkPartitionStateMachine and 
an error such as the following is created:

2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Controller moved to another broker when moving some 
replicas to OfflineReplica state
 org.apache.kafka.common.errors.ControllerMovedException: Controller epoch 
zkVersion check fails. Expected zkVersion = 139

5. The ControllerMovedException is rethrown and captured by the 
KafkaController, which will resign

At this point, the controller has resigned, however the stopReplicaRequestMap 
state populated in step 1 hasn't been cleared.

Later on, when the controller wins an election and becomes the active 
controller again, an IllegalStateException will be triggered due to the left 
over state:

 
{code:java}
2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Error while moving some replicas to OnlineReplica state
 java.lang.IllegalStateException: Controller to broker state change requests 
batch is not empty while creating a new one. Some StopReplica state changes 
Map(6121 -> ListB\
 uffer(StopRepl\
 icaRequestInfo([Topic=,Partition=2,Replica=6121],false))) might be lost
 at 
kafka.controller.AbstractControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:383)
 ~[kafka_2.12-2.4.1.10.jar:?]
 at 
kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:109)
 ~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:40) 
~[kafka_2.12-2.4.1.10.jar:?]
 at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:365)
 ~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.KafkaController.elect(KafkaController.scala:1484) 
~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.KafkaController.processReelect(KafkaController.scala:1972) 
~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.KafkaController.process(KafkaController.scala:2065) 
~[kafka_2.12-2.4.1.10.jar:?]
 at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53) 
~[kafka_2.12-2.4.1.10.jar:?]
 at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137)
 ~[kafka_2.12-2.4.1.10.jar:?]
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
[scala-library-2.12.10.jar:?]
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) 
[kafka_2.12-2.4.1.10.jar:?]
 at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137)
 [kafka_2.12-2.4.1.10.jar:?]
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) 
[kafka_2.12-2.4.1.10.jar:?]{code}


 Essentially, the controller is not able to transition some replicas to 
OnlineReplica state, and it cannot send any requests to any brokers via the 
ReplicaStateMachine.

  was:
As shown in the attached sequence diagram, during topic deletion the following 
sequence of events can happen
1. The ZkReplicaStateMachine calls 
AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and
   adds some entries to its stopReplicaRequestMap

2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr

3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a 
ControllerMovedException may be thrown
   due to zkVersion check failure

4. The ControllerMovedException is captured by the ZkPartitionStateMachine and 
an error such as the following is created:

2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Controller moved to another broker when moving some 
replicas to OfflineReplica state
org.apache.kafka.common.errors.ControllerMovedException: Controller epoch 
zkVersion check fails. Expected zkVersion = 139

5. The ControllerMovedException is rethrown and captured by the 
KafkaController, which will resign

At this point, the controller has resigned, however the stopReplicaRequestMap 
state populated in step 1 hasn't been cleared.

Later on, when the controller wins an election and becomes the active 
controller again, an IllegalStateException will be triggered due to the left 
over state:

```
2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=3

[jira] [Created] (KAFKA-12315) Clearing the ZkReplicaStateMachine request batch state upon ControllerMovedException

2021-02-08 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-12315:
--

 Summary: Clearing the ZkReplicaStateMachine request batch state 
upon ControllerMovedException
 Key: KAFKA-12315
 URL: https://issues.apache.org/jira/browse/KAFKA-12315
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang
 Attachments: controller_moved_left_over_state.png

As shown in the attached sequence diagram, during topic deletion the following 
sequence of events can happen
1. The ZkReplicaStateMachine calls 
AbstractControllerBrokerRequestBatch.addStopReplicaRequestForBrokers and
   adds some entries to its stopReplicaRequestMap

2. The ZkReplicaStateMachine then tries to call KafkaZkClient.updateLeaderAndIsr

3. Inside KafkaZkClient$.unwrapResponseWithControllerEpochCheck, a 
ControllerMovedException may be thrown
   due to zkVersion check failure

4. The ControllerMovedException is captured by the ZkPartitionStateMachine and 
an error such as the following is created:

2021/02/05 04:34:45.302 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Controller moved to another broker when moving some 
replicas to OfflineReplica state
org.apache.kafka.common.errors.ControllerMovedException: Controller epoch 
zkVersion check fails. Expected zkVersion = 139

5. The ControllerMovedException is rethrown and captured by the 
KafkaController, which will resign

At this point, the controller has resigned, however the stopReplicaRequestMap 
state populated in step 1 hasn't been cleared.

Later on, when the controller wins an election and becomes the active 
controller again, an IllegalStateException will be triggered due to the left 
over state:

```
2021/02/05 16:04:33.193 [ZkReplicaStateMachine] [ReplicaStateMachine 
controllerId=31862] Error while moving some replicas to OnlineReplica state
java.lang.IllegalStateException: Controller to broker state change requests 
batch is not empty while creating a new one. Some StopReplica state changes 
Map(6121 -> ListB\
uffer(StopRepl\
icaRequestInfo([Topic=,Partition=2,Replica=6121],false))) 
might be lost
at 
kafka.controller.AbstractControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:383)
 ~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:109)
 ~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:40) 
~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:365)
 ~[kafka_2.12-2.4.1.10.jar:?]
at kafka.controller.KafkaController.elect(KafkaController.scala:1484) 
~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.KafkaController.processReelect(KafkaController.scala:1972) 
~[kafka_2.12-2.4.1.10.jar:?]
at kafka.controller.KafkaController.process(KafkaController.scala:2065) 
~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53) 
~[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137)
 ~[kafka_2.12-2.4.1.10.jar:?]
at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) 
[scala-library-2.12.10.jar:?]
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) 
[kafka_2.12-2.4.1.10.jar:?]
at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137)
 [kafka_2.12-2.4.1.10.jar:?]
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) 
[kafka_2.12-2.4.1.10.jar:?]
```
Essentially, the controller is not able to transition some replicas to 
OnlineReplica state, and it cannot send any requests to any brokers via the 
ReplicaStateMachine.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10751) Generate log to help estimate messages lost during ULE

2020-11-19 Thread Lucas Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235702#comment-17235702
 ] 

Lucas Wang commented on KAFKA-10751:


PR submitted: https://github.com/apache/kafka/pull/9533

> Generate log to help estimate messages lost during ULE
> --
>
> Key: KAFKA-10751
> URL: https://issues.apache.org/jira/browse/KAFKA-10751
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Major
>
> During Unclean Leader Election, there could be data loss due to truncation at 
> the resigned leader.
> Suppose there are 3 brokers that has replicas for a given partition:
> Broker A (leader) with largest offset 9 (log end offset 10)
> Broker B (follower) with largest offset 4 (log end offset 5)
> Broker C (follower) with largest offset 1 (log end offset 2)
> Only the leader A is in the ISR with B and C lagging behind.
> Now an unclean leader election causes the leadership to be transferred to C. 
> Broker A would need to truncate 8 messages, and Broker B 3 messages.
> Case 1: if these messages have been produced with acks=0 or 1, then clients 
> would experience 8 lost messages.
> Case 2: if the client is using acks=all and the partition's minISR setting is 
> 2, and further let's assume broker B dropped out of the ISR after receiving 
> the message with offset 4, then only the messages with offset<=4 have been 
> acked to the client. The truncation effectively causes the client to lose 3 
> messages.
> Knowing the exact amount of data loss involves knowing the client's acks 
> setting when the messages are produced, and also whether the messages have 
> been sufficiently replicated according to the MinISR setting.
> If getting the exact data loss is too involved, at least there should be logs 
> to help ESTIMATE the amount of data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10751) Generate log to help estimate messages lost during ULE

2020-11-19 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-10751:
--

 Summary: Generate log to help estimate messages lost during ULE
 Key: KAFKA-10751
 URL: https://issues.apache.org/jira/browse/KAFKA-10751
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang
Assignee: Lucas Wang


During Unclean Leader Election, there could be data loss due to truncation at 
the resigned leader.

Suppose there are 3 brokers that has replicas for a given partition:
Broker A (leader) with largest offset 9 (log end offset 10)
Broker B (follower) with largest offset 4 (log end offset 5)
Broker C (follower) with largest offset 1 (log end offset 2)

Only the leader A is in the ISR with B and C lagging behind.
Now an unclean leader election causes the leadership to be transferred to C. 
Broker A would need to truncate 8 messages, and Broker B 3 messages.

Case 1: if these messages have been produced with acks=0 or 1, then clients 
would experience 8 lost messages.
Case 2: if the client is using acks=all and the partition's minISR setting is 
2, and further let's assume broker B dropped out of the ISR after receiving the 
message with offset 4, then only the messages with offset<=4 have been acked to 
the client. The truncation effectively causes the client to lose 3 messages.

Knowing the exact amount of data loss involves knowing the client's acks 
setting when the messages are produced, and also whether the messages have been 
sufficiently replicated according to the MinISR setting.


If getting the exact data loss is too involved, at least there should be logs 
to help ESTIMATE the amount of data loss.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10734) Speedup the processing of LeaderAndIsr request

2020-11-18 Thread Lucas Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235061#comment-17235061
 ] 

Lucas Wang commented on KAFKA-10734:


Thanks for your reply [~junrao]. In some clusters, our SREs set the 
num.replica.fetchers to be much greater than 1, probably for the sake of higher 
replication throughput.

> Speedup the processing of LeaderAndIsr request
> --
>
> Key: KAFKA-10734
> URL: https://issues.apache.org/jira/browse/KAFKA-10734
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Major
>
> Consider the case where a LeaderAndIsr request contains many partitions, of 
> which the broker is asked to become the follower. Let's call these partitions 
> *partitionsToMakeFollower*. Further more, let's assume the cluster has n 
> brokers and each broker is configured to have m replica fetchers (via the 
> num.replica.fetchers config). 
> The broker is likely to have (n-1) * m fetcher threads.
> Processing the LeaderAndIsr request requires
> 1. removing the "partitionsToMakeFollower" from all of the fetcher threads 
> sequentially so that they won't be fetching from obsolete leaders.
> 2. adding the "partitionsToMakeFollower" to all of the fetcher threads 
> sequentially
> 3. shutting down the idle fetcher threads sequentially (by checking the 
> number of partitions held by each fetcher thread)
> On top of that, for each of the 3 operations above, the operation is handled 
> by the request handler thread (i.e. io thread). And to complete the 
> operation, the request handler thread needs to contend for the 
> "partitionMapLock" with the corresponding fetcher thread. In the worst case, 
> the request handler thread is blocked for (n-1) * m times for removing the 
> partitions, another (n-1) * m times for adding the partitions, and yet 
> another (n-1) * m times for shutting down the idle fetcher threads.
> Overall, all of the blocking can result in a significant delay in processing 
> the LeaderAndIsr request. The further implication is that if the follower 
> delays its fetching from the leader, there could be under MinISR partitions 
> in the cluster, causing unavailability for clients.
> This ticket is created to track speedup in the processing of the LeaderAndIsr 
> request.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10734) Speedup the processing of LeaderAndIsr request

2020-11-17 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-10734:
--

 Summary: Speedup the processing of LeaderAndIsr request
 Key: KAFKA-10734
 URL: https://issues.apache.org/jira/browse/KAFKA-10734
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang
Assignee: Lucas Wang


Consider the case where a LeaderAndIsr request contains many partitions, of 
which the broker is asked to become the follower. Let's call these partitions 
*partitionsToMakeFollower*. Further more, let's assume the cluster has n 
brokers and each broker is configured to have m replica fetchers (via the 
num.replica.fetchers config). 
The broker is likely to have (n-1) * m fetcher threads.
Processing the LeaderAndIsr request requires
1. removing the "partitionsToMakeFollower" from all of the fetcher threads 
sequentially so that they won't be fetching from obsolete leaders.

2. adding the "partitionsToMakeFollower" to all of the fetcher threads 
sequentially

3. shutting down the idle fetcher threads sequentially (by checking the number 
of partitions held by each fetcher thread)

On top of that, for each of the 3 operations above, the operation is handled by 
the request handler thread (i.e. io thread). And to complete the operation, the 
request handler thread needs to contend for the "partitionMapLock" with the 
corresponding fetcher thread. In the worst case, the request handler thread is 
blocked for (n-1) * m times for removing the partitions, another (n-1) * m 
times for adding the partitions, and yet another (n-1) * m times for shutting 
down the idle fetcher threads.

Overall, all of the blocking can result in a significant delay in processing 
the LeaderAndIsr request. The further implication is that if the follower 
delays its fetching from the leader, there could be under MinISR partitions in 
the cluster, causing unavailability for clients.

This ticket is created to track speedup in the processing of the LeaderAndIsr 
request.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2019-01-09 Thread Lucas Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16738587#comment-16738587
 ] 

Lucas Wang commented on KAFKA-7040:
---

Hi [~apovzner], thanks for the info. Please go ahead and take over that task. 
Thanks!

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-4453) add request prioritization

2018-11-05 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang reassigned KAFKA-4453:
-

Assignee: Mayuresh Gharat  (was: Lucas Wang)

> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Mayuresh Gharat
>Priority: Major
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests\[2\], and data loss (for some 
> unofficial\[3\] definition of data loss in terms of messages beyond the high 
> watermark)\[4\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
> metadata, causing rejected ProduceRequests. Based on how long the client acts 
> based on the stale metadata, the impact may or may not be visible to a 
> producer application. If the number of rejected ProduceRequests does not 
> exceed the max number of retries, the producer application would not be 
> impacted. On the other hand, if the retries are exhausted, the failed produce 
> will be visible to the producer application.
> \[3\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[4\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7350) Improve or remove the PreferredReplicaImbalanceCount metric

2018-09-07 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang reassigned KAFKA-7350:
-

Assignee: Ahmed Al-Mehdi  (was: Lucas Wang)

> Improve or remove the PreferredReplicaImbalanceCount metric
> ---
>
> Key: KAFKA-7350
> URL: https://issues.apache.org/jira/browse/KAFKA-7350
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Ahmed Al-Mehdi
>Priority: Major
>
> In KAFKA-6753, we identified that in the ControllerEventManager, updating two 
> metrics after processing every controller event ends up consuming too much 
> CPU. The first metric OfflinePartitionCount is resolved in KAFKA-6753, and 
> this ticket is for tracking progress on the 2nd metric 
> PreferredReplicaImbalanceCount. 
> The options we have about this metric include
> 1. Remove this metric given that if necessary, the value of this metric can 
> be derived by getting the metadata of all topics in the cluster
> 2. Piggyback the update of the metric every time the auto leader balancer 
> runs. The benefit is keeping this metric. However the downside is that this 
> metric may then get obsolete and incorrect depending on when it's checked.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6753) Speed up event processing on the controller

2018-08-27 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6753:
--
Description: 
The existing controller code updates metrics after processing every event. This 
can slow down event processing on the controller tremendously. In one profiling 
we see that updating metrics takes nearly 100% of the CPU for the controller 
event processing thread. Specifically the slowness can be attributed to two 
factors:
1. Each invocation to update the metrics is expensive. Specifically trying to 
calculate the offline partitions count requires iterating through all the 
partitions in the cluster to check if the partition is offline; and calculating 
the preferred replica imbalance count requires iterating through all the 
partitions in the cluster to check if a partition has a leader other than the 
preferred leader. In a large cluster, the number of partitions can be quite 
large, all seen by the controller. Even if the time spent to check a single 
partition is small, the accumulation effect of so many partitions in the 
cluster can make the invocation to update metrics quite expensive. One might 
argue that maybe the logic for processing each single partition is not 
optimized, we checked the CPU percentage of leaf nodes in the profiling result, 
and found that inside the loops of collection objects, e.g. the set of all 
partitions, no single function dominates the processing. Hence the large number 
of the partitions in a cluster is the main contributor to the slowness of one 
invocation to update the metrics.
2. The invocation to update metrics is called many times when the is a high 
number of events to be processed by the controller, one invocation after 
processing any event.

This ticket is used to track how we change the logic for the 
OfflinePartitionCount metric.
And KAFKA-7350 will be used to track progress for the 
PreferredReplicaImbalanceCount metric.



  was:
The existing controller code updates metrics after processing every event. This 
can slow down event processing on the controller tremendously. In one profiling 
we see that updating metrics takes nearly 100% of the CPU for the controller 
event processing thread. Specifically the slowness can be attributed to two 
factors:
1. Each invocation to update the metrics is expensive. Specifically trying to 
calculate the offline partitions count requires iterating through all the 
partitions in the cluster to check if the partition is offline; and calculating 
the preferred replica imbalance count requires iterating through all the 
partitions in the cluster to check if a partition has a leader other than the 
preferred leader. In a large cluster, the number of partitions can be quite 
large, all seen by the controller. Even if the time spent to check a single 
partition is small, the accumulation effect of so many partitions in the 
cluster can make the invocation to update metrics quite expensive. One might 
argue that maybe the logic for processing each single partition is not 
optimized, we checked the CPU percentage of leaf nodes in the profiling result, 
and found that inside the loops of collection objects, e.g. the set of all 
partitions, no single function dominates the processing. Hence the large number 
of the partitions in a cluster is the main contributor to the slowness of one 
invocation to update the metrics.
2. The invocation to update metrics is called many times when the is a high 
number of events to be processed by the controller, one invocation after 
processing any event.

This ticket is used to track how we change the logic for the 
OfflinePartitionCount metric.




> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can

[jira] [Created] (KAFKA-7350) Improve or remove the PreferredReplicaImbalanceCount metric

2018-08-27 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-7350:
-

 Summary: Improve or remove the PreferredReplicaImbalanceCount 
metric
 Key: KAFKA-7350
 URL: https://issues.apache.org/jira/browse/KAFKA-7350
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang
Assignee: Lucas Wang


In KAFKA-6753, we identified that in the ControllerEventManager, updating two 
metrics after processing every controller event ends up consuming too much CPU. 
The first metric OfflinePartitionCount is resolved in KAFKA-6753, and this 
ticket is for tracking progress on the 2nd metric 
PreferredReplicaImbalanceCount. 

The options we have about this metric include
1. Remove this metric given that if necessary, the value of this metric can be 
derived by getting the metadata of all topics in the cluster
2. Piggyback the update of the metric every time the auto leader balancer runs. 
The benefit is keeping this metric. However the downside is that this metric 
may then get obsolete and incorrect depending on when it's checked.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6753) Speed up event processing on the controller

2018-08-27 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6753:
--
Description: 
The existing controller code updates metrics after processing every event. This 
can slow down event processing on the controller tremendously. In one profiling 
we see that updating metrics takes nearly 100% of the CPU for the controller 
event processing thread. Specifically the slowness can be attributed to two 
factors:
1. Each invocation to update the metrics is expensive. Specifically trying to 
calculate the offline partitions count requires iterating through all the 
partitions in the cluster to check if the partition is offline; and calculating 
the preferred replica imbalance count requires iterating through all the 
partitions in the cluster to check if a partition has a leader other than the 
preferred leader. In a large cluster, the number of partitions can be quite 
large, all seen by the controller. Even if the time spent to check a single 
partition is small, the accumulation effect of so many partitions in the 
cluster can make the invocation to update metrics quite expensive. One might 
argue that maybe the logic for processing each single partition is not 
optimized, we checked the CPU percentage of leaf nodes in the profiling result, 
and found that inside the loops of collection objects, e.g. the set of all 
partitions, no single function dominates the processing. Hence the large number 
of the partitions in a cluster is the main contributor to the slowness of one 
invocation to update the metrics.
2. The invocation to update metrics is called many times when the is a high 
number of events to be processed by the controller, one invocation after 
processing any event.

This ticket is used to track how we change the logic for the 
OfflinePartitionCount metric.



  was:
The existing controller code updates metrics after processing every event. This 
can slow down event processing on the controller tremendously. In one profiling 
we see that updating metrics takes nearly 100% of the CPU for the controller 
event processing thread. Specifically the slowness can be attributed to two 
factors:
1. Each invocation to update the metrics is expensive. Specifically trying to 
calculate the offline partitions count requires iterating through all the 
partitions in the cluster to check if the partition is offline; and calculating 
the preferred replica imbalance count requires iterating through all the 
partitions in the cluster to check if a partition has a leader other than the 
preferred leader. In a large cluster, the number of partitions can be quite 
large, all seen by the controller. Even if the time spent to check a single 
partition is small, the accumulation effect of so many partitions in the 
cluster can make the invocation to update metrics quite expensive. One might 
argue that maybe the logic for processing each single partition is not 
optimized, we checked the CPU percentage of leaf nodes in the profiling result, 
and found that inside the loops of collection objects, e.g. the set of all 
partitions, no single function dominates the processing. Hence the large number 
of the partitions in a cluster is the main contributor to the slowness of one 
invocation to update the metrics.
2. The invocation to update metrics is called many times when the is a high 
number of events to be processed by the controller, one invocation after 
processing any event.




> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make th

[jira] [Resolved] (KAFKA-6753) Speed up event processing on the controller

2018-08-27 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang resolved KAFKA-6753.
---
Resolution: Fixed

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6753) Speed up event processing on the controller

2018-08-27 Thread Lucas Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16594328#comment-16594328
 ] 

Lucas Wang commented on KAFKA-6753:
---

[~junrao] Thanks will do. 

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-08-23 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang reassigned KAFKA-7040:
-

Assignee: Lucas Wang

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-08-23 Thread Lucas Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16590634#comment-16590634
 ] 

Lucas Wang commented on KAFKA-7040:
---

[~lindong] Sure, I will own this ticket and work on the fix.

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-6753) Speed up event processing on the controller

2018-08-21 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang reopened KAFKA-6753:
---

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6753) Speed up event processing on the controller

2018-08-21 Thread Lucas Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16588186#comment-16588186
 ] 

Lucas Wang commented on KAFKA-6753:
---

[~junrao] Personally I'm in favor of removing this metric compared with giving 
possibly incorrect/stale metric. I can start the KIP to collect more feedback. 
Meanwhile I'll keep this ticket open for tracking progress of the remaining 
work.

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6753) Speed up event processing on the controller

2018-08-21 Thread Lucas Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587994#comment-16587994
 ] 

Lucas Wang commented on KAFKA-6753:
---

Hi [~junrao] thanks reviewing the PR.
Besides the offlinePartitionCount metric,  the preferredReplicaImbalanceCount 
metric is another contributing factor for the observed problem. Making a 
similar change on this metric seems a quite involved effort since it depends on 
the set of partitions, the partition replica assignments, leadership changes, 
and topic deletion. Before diving into the details, (1) at least inside 
LinkedIn we don't use this metric at all, and (2) if needed, this count can be 
derived by requesting the metadata of all topics in the cluster. So I wonder 
whether it makes sense for us to remove this metric.

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6753) Speed up event processing on the controller

2018-08-21 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6753:
--
Description: 
The existing controller code updates metrics after processing every event. This 
can slow down event processing on the controller tremendously. In one profiling 
we see that updating metrics takes nearly 100% of the CPU for the controller 
event processing thread. Specifically the slowness can be attributed to two 
factors:
1. Each invocation to update the metrics is expensive. Specifically trying to 
calculate the offline partitions count requires iterating through all the 
partitions in the cluster to check if the partition is offline; and calculating 
the preferred replica imbalance count requires iterating through all the 
partitions in the cluster to check if a partition has a leader other than the 
preferred leader. In a large cluster, the number of partitions can be quite 
large, all seen by the controller. Even if the time spent to check a single 
partition is small, the accumulation effect of so many partitions in the 
cluster can make the invocation to update metrics quite expensive. One might 
argue that maybe the logic for processing each single partition is not 
optimized, we checked the CPU percentage of leaf nodes in the profiling result, 
and found that inside the loops of collection objects, e.g. the set of all 
partitions, no single function dominates the processing. Hence the large number 
of the partitions in a cluster is the main contributor to the slowness of one 
invocation to update the metrics.
2. The invocation to update metrics is called many times when the is a high 
number of events to be processed by the controller, one invocation after 
processing any event.



  was:
The existing controller code updates metrics after processing every event. This 
can slow down event processing on the controller tremendously. In one profiling 
we see that updating metrics takes nearly 100% of the CPU for the controller 
event processing thread. Specifically the slowness can be attributed to two 
factors:
1. Each invocation to update the metrics is expensive. Specifically trying to 
calculate the offline partitions count requires iterating through all the 
partitions in the cluster to check if the partition is offline; and calculating 
the preferred replica imbalance count requires iterating through all the 
partitions in the cluster to check if a partition has a leader other than the 
preferred leader. In a large cluster, the number of partitions can be quite 
large, all seen by the controller. Even if the time spent to check a single 
partition is small, the accumulation effect of so many partitions in the 
cluster can make the invocation to update metrics quite expensive. One might 
argue that maybe the logic for processing each single partition is not 
optimized, we checked the CPU percentage of leaf nodes in the profiling result, 
and found that inside the loops of collection objects, e.g. the set of all 
partitions, no single function dominates the processing. Hence the large number 
of the partitions in a cluster is the main contributor to the slowness of one 
invocation to update the metrics.
2. The invocation to update metrics is called many times when the is a high 
number of events to be processed by the controller, one invocation after 
processing any event.

The patch that will be submitted tries to fix bullet 2 above, i.e. reducing the 
number of invocations to update metrics. Instead of updating the metrics after 
processing any event, we only periodically check if the metrics needs to be 
updated, i.e. once every second. 
* If after the previous invocation to update metrics, there are other types of 
events that changed the controller’s state, then one second later the metrics 
will be updated. 
* If after the previous invocation, there has been no other types of events, 
then the call to update metrics can be bypassed.




> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Fix For: 2.1.0
>
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partition

[jira] [Resolved] (KAFKA-6974) Changes the interaction between request handler threads and fetcher threads into an ASYNC model

2018-08-15 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6974?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang resolved KAFKA-6974.
---
Resolution: Won't Fix

> Changes the interaction between request handler threads and fetcher threads 
> into an ASYNC model
> ---
>
> Key: KAFKA-6974
> URL: https://issues.apache.org/jira/browse/KAFKA-6974
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> At LinkedIn, occasionally our clients complain about receiving consant 
> NotLeaderForPartition exceptions 
> Investigations:
> For one investigated case, the cluster was going through a rolling bounce. 
> And we saw there was a ~8 minutes delay between an old partition leader 
> resigning and the new leader becoming active, based on entries of "Broker xxx 
> handling LeaderAndIsr request" in the state change log.
> Our monitoring shows the LeaderAndISR request local time during the incident 
> went up to ~4 minutes.
> Explanations:
> One possible explanation of the ~8 minutes of delay is:
> During controlled shutdown of a broker, the partitions whose leaders lie on 
> the shutting down broker need to go through leadership transitions. And the 
> controller process partitions in batches with each batch having 
> config.controlledShutdownPartitionBatchSize partitions, e.g. 100.
> If the 1st LeaderAndISR sent to a new leader broker takes too long, e.g. 4 
> minutes, then the subsequent LeaderAndISR requests can have an accumulated 
> delay of maybe 4 minutes, 8 minutes, or even 12 minutes... The reason is that 
> subsequent LeaderAndISR requests are blocked in a muted channel, given only 
> one LeaderAndISR request can be processed at a time with a 
> maxInFlightRequestsPerConnection setting of 1. When that happens, no existing 
> metric would show the total delay of 8 or 12 minutes for muted requests.
> Now the question is why it took ~4 minutes for the the 1st LeaderAndISR 
> request to finish.
> Explanation for the ~4 minutes of local time for LeaderAndISR request:
> During processing of an LeaderAndISR request, the request handler thread 
> needs to add partitions to or remove partitions from partitionStates field of 
> the ReplicaFetcherThread, also shutdown idle fetcher threads by checking the 
> size of the partitionStates field. On the other hand, background fetcher 
> threads need to iterate through all the partitions in partitionStates in 
> order to build fetch request, and process fetch responses. The 
> synchronization between request handler thread and the fetcher threads is 
> done through a partitionMapLock. 
> Specifically, the fetcher threads may acquire the partitionMapLock, and then 
> calls the following functions for processing the fetch response
> (1) processPartitionData, which in turn calls 
> (2) Replica.maybeIncrementLogStartOffset, which calls 
> (3) Log.maybeIncrementLogStartOffset, which calls 
> (4) LeaderEpochCache.clearAndFlushEarliest.
> Now two factors contribute to the long holding of the partitionMapLock,
> 1. function (4) above entails calling sync() to make sure data gets 
> persistent to the disk, which may potentially have a long latency
> 2. All the 4 functions above can potentially be called for each partition in 
> the fetch response, multiplying the sync() latency by a factor of n.
> The end result is that the request handler thread got blocked for a long time 
> trying to acquire the partitionMapLock of some fetcher inside 
> AbstractFetcherManager.shutdownIdleFetcherThreads since checking each 
> fetcher's partitionCount requires getting the partitionMapLock.
> In our testing environment, we reproduced the problem and confirmed the 
> explanation above with a request handler thread getting blocked for 10 
> seconds trying to acquire the partitionMapLock of one particular fetcher 
> thread, while there are many log entries showing "Incrementing log start 
> offset of partition..."
> Proposed change:
> We propose to change the interaction between the request handler threads and 
> the fetcher threads to an ASYNC model by using an event queue. All requests 
> to add or remove partitions, or shutdown idle fetcher threads are modeled as 
> items in the event queue. And only the fetcher threads can take items out of 
> the event queue and actually process them.
> In the new ASYNC model, in order to be able to process an infinite sequence 
> of FetchRequests, a fetcher thread initially has one FetchRequest, and after 
> it's done processing one FetchRequest, it enqueues one more into its own 
> event queue.
> Also since the current AbstractFetcherThread logic is inherited by both the 
> replica-fetcher-threads and the consumer-fetcher-threads for the old 
> consumer, and the latter has been deprecated, 

[jira] [Created] (KAFKA-7180) In testHWCheckpointWithFailuresSingleLogSegment, wait until server1 has joined the ISR before shutting down server2

2018-07-18 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-7180:
-

 Summary: In testHWCheckpointWithFailuresSingleLogSegment, wait 
until server1 has joined the ISR before shutting down server2
 Key: KAFKA-7180
 URL: https://issues.apache.org/jira/browse/KAFKA-7180
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Wang
Assignee: Lucas Wang


In the testHWCheckpointWithFailuresSingleLogSegment method, the test logic is 
1. shutdown server1 and then capture the leadership of a partition in the 
variable "leader", which should be server2
2. shutdown server2 and wait until the leadership has changed to a broker other 
than server2
through the line 
waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, oldLeaderOpt = 
Some(leader))

However when we execute step 2 and shutdown server2, it's possible that server1 
has not caught up with the partition, and has not joined the ISR. With unclean 
leader election turned off, the leadership cannot be transferred to server1, 
causing the waited condition in step 2 to be never met. 

The obvious fix is to wait until server1 has joined the ISR before shutting 
down server2.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7162) Flaky unit tests caused by record creation timestamps differ from validation time by more than timestampDiffMaxMs

2018-07-13 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang reassigned KAFKA-7162:
-

Assignee: Lucas Wang

> Flaky unit tests caused by record creation timestamps differ from validation 
> time by more than timestampDiffMaxMs
> -
>
> Key: KAFKA-7162
> URL: https://issues.apache.org/jira/browse/KAFKA-7162
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
>
> While running gradle unit tests, we found the test method 
> LogValidatorTest.testCompressedV1 can fail sometimes. Upon investigation, it 
> turns out the test method uses one set of timestamps, say t0, t1 and t2, for 
> the records, while using a separate timestamp, say t3, for the "now" 
> parameter when invoking the LogValidator.validateMessagesAndAssignOffsets 
> method. The validateMessagesAndAssignOffsets validation method also takes a 
> parameter timestampDiffMaxMs=1 second, that specifies the maximum allowed 
> time different between t3 and the timestamps in records, i.e. t0, t1, and t2. 
> While running unit tests, especially when multiple tests are run 
> simultaneously, there is no guarantee that the time difference between t3 and 
> t0 is within 1 second, causing the test method to flaky sometimes. Many other 
> test methods in the LogValidatorTest can suffer from the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7162) Flaky unit tests caused by record creation timestamps differ from validation time by more than timestampDiffMaxMs

2018-07-13 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-7162:
-

 Summary: Flaky unit tests caused by record creation timestamps 
differ from validation time by more than timestampDiffMaxMs
 Key: KAFKA-7162
 URL: https://issues.apache.org/jira/browse/KAFKA-7162
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Wang


While running gradle unit tests, we found the test method 
LogValidatorTest.testCompressedV1 can fail sometimes. Upon investigation, it 
turns out the test method uses one set of timestamps, say t0, t1 and t2, for 
the records, while using a separate timestamp, say t3, for the "now" parameter 
when invoking the LogValidator.validateMessagesAndAssignOffsets method. The 
validateMessagesAndAssignOffsets validation method also takes a parameter 
timestampDiffMaxMs=1 second, that specifies the maximum allowed time different 
between t3 and the timestamps in records, i.e. t0, t1, and t2. While running 
unit tests, especially when multiple tests are run simultaneously, there is no 
guarantee that the time difference between t3 and t0 is within 1 second, 
causing the test method to flaky sometimes. Many other test methods in the 
LogValidatorTest can suffer from the same problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-22 Thread Lucas Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16520899#comment-16520899
 ] 

Lucas Wang commented on KAFKA-7040:
---

Hi [~apovzner]
Sorry for the delayed response.

Regarding Case 1 above, I did some testing with min.insync.replicas == 2 and 
unclean leader election DISABLED, to check if a real message loss can happen
https://github.com/apache/kafka/compare/trunk...gitlw:test_incorrect_truncation_causing_message_loss
, and it turns out the answer is yes.
On the high level if AFTER the truncation in step 5, broker0 again becomes the 
leader, and broker1 starts fetching from broker0 who has fewer messages, then 
the message will forever be lost.

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-15 Thread Lucas Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514422#comment-16514422
 ] 

Lucas Wang commented on KAFKA-7040:
---

Hi [~apovzner]

Thanks for your comment. I didn't see an actual message loss in a real cluster, 
and this is purely based on reasoning and unit testing.

In general, I think if truncation happens below the HW, it's possible for a 
message loss to happen. For example, after the truncation at step 5, leadership 
gets changed back to broker 0, and broker 1 becomes the follower. Then the 
message at offset 100 will forever be lost.

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-11 Thread Lucas Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508907#comment-16508907
 ] 

Lucas Wang commented on KAFKA-7040:
---

To be more specific, I think the following sequence of events may cause a 
truncation below HW.

Say currently both broker0 and broker1 have finished processing of a 
LeaderAndISR request with leader being broker1, and leader epoch 10, and both 
of them have 100 messages in their respective log (with the largest offset 99, 
and LEO of 100).

1. The replica fetcher thread on a broker0 issues a LeaderEpoch request with 
leader epoch 10 to broker1, and broker1 replies with a LEO of 100 given it's 
the latest leader epoch. Before the replica fetcher on broker0 acquires the 
AbstractFetcherThread.partitionMapLock and processes the LeaderEpoch response, 
it goes through steps 2-4 first.
2. A LeaderAndISR request causes broker0 to become the leader for one partition 
t1p0, which in turn will remove the partition t1p0 from the replica fetcher 
thread
3. Broker0 accepts one message at offset 100 from a producer, and the message 
gets replicated to broker1, causing the HW on broker0 to go up to 100.
4. A 2nd LeaderAndISR request causes broker1 to become the leader, and broker0 
to become the follower for partition t1p0. This will cause the partition t1p0 
to be added back to the replica fetcher thread on broker0.
5. The replica fetcher thread on broker0 processes the LeaderEpoch response 
received in step 1, and truncates the accepted message with offset 100 in step3.

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-11 Thread Lucas Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7040?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508867#comment-16508867
 ] 

Lucas Wang commented on KAFKA-7040:
---

[~lindong] Thanks for pointing out the truncation may happen below high 
watermark.

Also your suggested change seems to work by remembering that some partition 
state has been used, so that we can distinguish
between different generations of the partition state.

> The replica fetcher thread may truncate accepted messages during multiple 
> fast leadership transitions
> -
>
> Key: KAFKA-7040
> URL: https://issues.apache.org/jira/browse/KAFKA-7040
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> Consider the scenario where there are two brokers, broker0, and broker1, and 
> there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
> the leader and broker0 as the follower. The following sequence of events 
> happened on broker0
> 1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
> broker1, and awaits to get the response
> 2. A LeaderAndISR request causes broker0 to become the leader for one 
> partition t1p0, which in turn will remove the partition t1p0 from the replica 
> fetcher thread
> 3. Broker0 accepts some messages from a producer
> 4. A 2nd LeaderAndISR request causes broker1 to become the leader, and 
> broker0 to become the follower for partition t1p0. This will cause the 
> partition t1p0 to be added back to the replica fetcher thread on broker0.
> 5. The replica fetcher thread on broker0 receives a response for the 
> LeaderEpoch request issued in step 1, and truncates the accepted messages in 
> step3.
> The issue can be reproduced with the test from 
> https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea
> [1] Initially we set up broker0 to be the follower of two partitions instead 
> of just one, to avoid the shutting down of the replica fetcher thread when it 
> becomes idle.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7040) The replica fetcher thread may truncate accepted messages during multiple fast leadership transitions

2018-06-11 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-7040:
-

 Summary: The replica fetcher thread may truncate accepted messages 
during multiple fast leadership transitions
 Key: KAFKA-7040
 URL: https://issues.apache.org/jira/browse/KAFKA-7040
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Wang


Problem Statement:
Consider the scenario where there are two brokers, broker0, and broker1, and 
there are two partitions "t1p0", and "t1p1"[1], both of which have broker1 as 
the leader and broker0 as the follower. The following sequence of events 
happened on broker0

1. The replica fetcher thread on a broker0 issues a LeaderEpoch request to 
broker1, and awaits to get the response
2. A LeaderAndISR request causes broker0 to become the leader for one partition 
t1p0, which in turn will remove the partition t1p0 from the replica fetcher 
thread
3. Broker0 accepts some messages from a producer
4. A 2nd LeaderAndISR request causes broker1 to become the leader, and broker0 
to become the follower for partition t1p0. This will cause the partition t1p0 
to be added back to the replica fetcher thread on broker0.
5. The replica fetcher thread on broker0 receives a response for the 
LeaderEpoch request issued in step 1, and truncates the accepted messages in 
step3.

The issue can be reproduced with the test from 
https://github.com/gitlw/kafka/commit/8956e743f0e432cc05648da08c81fc1167b31bea

[1] Initially we set up broker0 to be the follower of two partitions instead of 
just one, to avoid the shutting down of the replica fetcher thread when it 
becomes idle.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4453) add request prioritization

2018-06-05 Thread Lucas Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-4453:
--
Description: 
Today all requests (client requests, broker requests, controller requests) to a 
broker are put into the same queue. They all have the same priority. So a 
backlog of requests ahead of the controller request will delay the processing 
of controller requests. This causes requests infront of the controller request 
to get processed based on stale state.

Side effects may include giving clients stale metadata\[1\], rejecting 
ProduceRequests and FetchRequests\[2\], and data loss (for some unofficial\[3\] 
definition of data loss in terms of messages beyond the high watermark)\[4\].

We'd like to minimize the number of requests processed based on stale state. 
With request prioritization, controller requests get processed before regular 
queued up requests, so requests can get processed with up-to-date state.

\[1\] Say a client's MetadataRequest is sitting infront of a controller's 
UpdateMetadataRequest on a given broker's request queue. Suppose the 
MetadataRequest is for a topic whose partitions have recently undergone 
leadership changes and that these leadership changes are being broadcasted from 
the controller in the later UpdateMetadataRequest. Today the broker processes 
the MetadataRequest before processing the UpdateMetadataRequest, meaning the 
metadata returned to the client will be stale. The client will waste a 
roundtrip sending requests to the stale partition leader, get a 
NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
topic metadata again.
\[2\] Clients can issue ProduceRequests to the wrong broker based on stale 
metadata, causing rejected ProduceRequests. Based on how long the client acts 
based on the stale metadata, the impact may or may not be visible to a producer 
application. If the number of rejected ProduceRequests does not exceed the max 
number of retries, the producer application would not be impacted. On the other 
hand, if the retries are exhausted, the failed produce will be visible to the 
producer application.
\[3\] The official definition of data loss in kafka is when we lose a 
"committed" message. A message is considered "committed" when all in sync 
replicas for that partition have applied it to their log.
\[4\] Say a number of ProduceRequests are sitting infront of a controller's 
LeaderAndIsrRequest on a given broker's request queue. Suppose the 
ProduceRequests are for partitions whose leadership has recently shifted out 
from the current broker to another broker in the replica set. Today the broker 
processes the ProduceRequests before the LeaderAndIsrRequest, meaning the 
ProduceRequests are getting processed on the former partition leader. As part 
of becoming a follower for a partition, the broker truncates the log to the 
high-watermark. With weaker ack settings such as acks=1, the leader may 
successfully write to its own log, respond to the user with a success, process 
the LeaderAndIsrRequest making the broker a follower of the partition, and 
truncate the log to a point before the user's produced messages. So users have 
a false sense that their produce attempt succeeded while in reality their 
messages got erased. While technically part of what they signed up for with 
acks=1, it can still come as a surprise.

  was:
Today all requests (client requests, broker requests, controller requests) to a 
broker are put into the same queue. They all have the same priority. So a 
backlog of requests ahead of the controller request will delay the processing 
of controller requests. This causes requests infront of the controller request 
to get processed based on stale state.

Side effects may include giving clients stale metadata\[1\], rejecting 
ProduceRequests and FetchRequests, and data loss (for some unofficial\[2\] 
definition of data loss in terms of messages beyond the high watermark)\[3\].

We'd like to minimize the number of requests processed based on stale state. 
With request prioritization, controller requests get processed before regular 
queued up requests, so requests can get processed with up-to-date state.

\[1\] Say a client's MetadataRequest is sitting infront of a controller's 
UpdateMetadataRequest on a given broker's request queue. Suppose the 
MetadataRequest is for a topic whose partitions have recently undergone 
leadership changes and that these leadership changes are being broadcasted from 
the controller in the later UpdateMetadataRequest. Today the broker processes 
the MetadataRequest before processing the UpdateMetadataRequest, meaning the 
metadata returned to the client will be stale. The client will waste a 
roundtrip sending requests to the stale partition leader, get a 
NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
topic metadata again.
\[2\] The official defini

[jira] [Created] (KAFKA-6974) Changes the interaction between request handler threads and fetcher threads into an ASYNC model

2018-05-31 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6974:
-

 Summary: Changes the interaction between request handler threads 
and fetcher threads into an ASYNC model
 Key: KAFKA-6974
 URL: https://issues.apache.org/jira/browse/KAFKA-6974
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


Problem Statement:
At LinkedIn, occasionally our clients complain about receiving consant 
NotLeaderForPartition exceptions 

Investigations:
For one investigated case, the cluster was going through a rolling bounce. And 
we saw there was a ~8 minutes delay between an old partition leader resigning 
and the new leader becoming active, based on entries of "Broker xxx handling 
LeaderAndIsr request" in the state change log.
Our monitoring shows the LeaderAndISR request local time during the incident 
went up to ~4 minutes.


Explanations:
One possible explanation of the ~8 minutes of delay is:
During controlled shutdown of a broker, the partitions whose leaders lie on the 
shutting down broker need to go through leadership transitions. And the 
controller process partitions in batches with each batch having 
config.controlledShutdownPartitionBatchSize partitions, e.g. 100.
If the 1st LeaderAndISR sent to a new leader broker takes too long, e.g. 4 
minutes, then the subsequent LeaderAndISR requests can have an accumulated 
delay of maybe 4 minutes, 8 minutes, or even 12 minutes... The reason is that 
subsequent LeaderAndISR requests are blocked in a muted channel, given only one 
LeaderAndISR request can be processed at a time with a 
maxInFlightRequestsPerConnection setting of 1. When that happens, no existing 
metric would show the total delay of 8 or 12 minutes for muted requests.
Now the question is why it took ~4 minutes for the the 1st LeaderAndISR request 
to finish.

Explanation for the ~4 minutes of local time for LeaderAndISR request:
During processing of an LeaderAndISR request, the request handler thread needs 
to add partitions to or remove partitions from partitionStates field of the 
ReplicaFetcherThread, also shutdown idle fetcher threads by checking the size 
of the partitionStates field. On the other hand, background fetcher threads 
need to iterate through all the partitions in partitionStates in order to build 
fetch request, and process fetch responses. The synchronization between request 
handler thread and the fetcher threads is done through a partitionMapLock. 
Specifically, the fetcher threads may acquire the partitionMapLock, and then 
calls the following functions for processing the fetch response
(1) processPartitionData, which in turn calls 
(2) Replica.maybeIncrementLogStartOffset, which calls 
(3) Log.maybeIncrementLogStartOffset, which calls 
(4) LeaderEpochCache.clearAndFlushEarliest.
Now two factors contribute to the long holding of the partitionMapLock,
1. function (4) above entails calling sync() to make sure data gets persistent 
to the disk, which may potentially have a long latency
2. All the 4 functions above can potentially be called for each partition in 
the fetch response, multiplying the sync() latency by a factor of n.

The end result is that the request handler thread got blocked for a long time 
trying to acquire the partitionMapLock of some fetcher inside 
AbstractFetcherManager.shutdownIdleFetcherThreads since checking each fetcher's 
partitionCount requires getting the partitionMapLock.

In our testing environment, we reproduced the problem and confirmed the 
explanation above with a request handler thread getting blocked for 10 seconds 
trying to acquire the partitionMapLock of one particular fetcher thread, while 
there are many log entries showing "Incrementing log start offset of 
partition..."

Proposed change:
We propose to change the interaction between the request handler threads and 
the fetcher threads to an ASYNC model by using an event queue. All requests to 
add or remove partitions, or shutdown idle fetcher threads are modeled as items 
in the event queue. And only the fetcher threads can take items out of the 
event queue and actually process them.
In the new ASYNC model, in order to be able to process an infinite sequence of 
FetchRequests, a fetcher thread initially has one FetchRequest, and after it's 
done processing one FetchRequest, it enqueues one more into its own event queue.
Also since the current AbstractFetcherThread logic is inherited by both the 
replica-fetcher-threads and the consumer-fetcher-threads for the old consumer, 
and the latter has been deprecated, we plan to implement the ASYNC model with a 
clean-slate approach, and only support the replica-fetcher-threads, in order to 
make the code cleaner.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-4453) add request prioritization

2018-05-15 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang reassigned KAFKA-4453:
-

Assignee: Lucas Wang  (was: Onur Karaman)

> add request prioritization
> --
>
> Key: KAFKA-4453
> URL: https://issues.apache.org/jira/browse/KAFKA-4453
> Project: Kafka
>  Issue Type: Bug
>Reporter: Onur Karaman
>Assignee: Lucas Wang
>Priority: Major
>
> Today all requests (client requests, broker requests, controller requests) to 
> a broker are put into the same queue. They all have the same priority. So a 
> backlog of requests ahead of the controller request will delay the processing 
> of controller requests. This causes requests infront of the controller 
> request to get processed based on stale state.
> Side effects may include giving clients stale metadata\[1\], rejecting 
> ProduceRequests and FetchRequests, and data loss (for some unofficial\[2\] 
> definition of data loss in terms of messages beyond the high watermark)\[3\].
> We'd like to minimize the number of requests processed based on stale state. 
> With request prioritization, controller requests get processed before regular 
> queued up requests, so requests can get processed with up-to-date state.
> \[1\] Say a client's MetadataRequest is sitting infront of a controller's 
> UpdateMetadataRequest on a given broker's request queue. Suppose the 
> MetadataRequest is for a topic whose partitions have recently undergone 
> leadership changes and that these leadership changes are being broadcasted 
> from the controller in the later UpdateMetadataRequest. Today the broker 
> processes the MetadataRequest before processing the UpdateMetadataRequest, 
> meaning the metadata returned to the client will be stale. The client will 
> waste a roundtrip sending requests to the stale partition leader, get a 
> NOT_LEADER_FOR_PARTITION error, and will have to start all over and query the 
> topic metadata again.
> \[2\] The official definition of data loss in kafka is when we lose a 
> "committed" message. A message is considered "committed" when all in sync 
> replicas for that partition have applied it to their log.
> \[3\] Say a number of ProduceRequests are sitting infront of a controller's 
> LeaderAndIsrRequest on a given broker's request queue. Suppose the 
> ProduceRequests are for partitions whose leadership has recently shifted out 
> from the current broker to another broker in the replica set. Today the 
> broker processes the ProduceRequests before the LeaderAndIsrRequest, meaning 
> the ProduceRequests are getting processed on the former partition leader. As 
> part of becoming a follower for a partition, the broker truncates the log to 
> the high-watermark. With weaker ack settings such as acks=1, the leader may 
> successfully write to its own log, respond to the user with a success, 
> process the LeaderAndIsrRequest making the broker a follower of the 
> partition, and truncate the log to a point before the user's produced 
> messages. So users have a false sense that their produce attempt succeeded 
> while in reality their messages got erased. While technically part of what 
> they signed up for with acks=1, it can still come as a surprise.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6753) Speed up event processing on the controller

2018-04-05 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6753:
-

 Summary: Speed up event processing on the controller 
 Key: KAFKA-6753
 URL: https://issues.apache.org/jira/browse/KAFKA-6753
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang
Assignee: Lucas Wang
 Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png

The existing controller code updates metrics after processing every event. This 
can slow down event processing on the controller tremendously. In one profiling 
we see that updating metrics takes nearly 100% of the CPU for the controller 
event processing thread. Specifically the slowness can be attributed to two 
factors:
1. Each invocation to update the metrics is expensive. Specifically trying to 
calculate the offline partitions count requires iterating through all the 
partitions in the cluster to check if the partition is offline; and calculating 
the preferred replica imbalance count requires iterating through all the 
partitions in the cluster to check if a partition has a leader other than the 
preferred leader. In a large cluster, the number of partitions can be quite 
large, all seen by the controller. Even if the time spent to check a single 
partition is small, the accumulation effect of so many partitions in the 
cluster can make the invocation to update metrics quite expensive. One might 
argue that maybe the logic for processing each single partition is not 
optimized, we checked the CPU percentage of leaf nodes in the profiling result, 
and found that inside the loops of collection objects, e.g. the set of all 
partitions, no single function dominates the processing. Hence the large number 
of the partitions in a cluster is the main contributor to the slowness of one 
invocation to update the metrics.
2. The invocation to update metrics is called many times when the is a high 
number of events to be processed by the controller, one invocation after 
processing any event.

The patch that will be submitted tries to fix bullet 2 above, i.e. reducing the 
number of invocations to update metrics. Instead of updating the metrics after 
processing any event, we only periodically check if the metrics needs to be 
updated, i.e. once every second. 
* If after the previous invocation to update metrics, there are other types of 
events that changed the controller’s state, then one second later the metrics 
will be updated. 
* If after the previous invocation, there has been no other types of events, 
then the call to update metrics can be bypassed.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6753) Speed up event processing on the controller

2018-04-05 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6753:
--
Attachment: Screen Shot 2018-04-04 at 7.08.55 PM.png

> Speed up event processing on the controller 
> 
>
> Key: KAFKA-6753
> URL: https://issues.apache.org/jira/browse/KAFKA-6753
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
> Attachments: Screen Shot 2018-04-04 at 7.08.55 PM.png
>
>
> The existing controller code updates metrics after processing every event. 
> This can slow down event processing on the controller tremendously. In one 
> profiling we see that updating metrics takes nearly 100% of the CPU for the 
> controller event processing thread. Specifically the slowness can be 
> attributed to two factors:
> 1. Each invocation to update the metrics is expensive. Specifically trying to 
> calculate the offline partitions count requires iterating through all the 
> partitions in the cluster to check if the partition is offline; and 
> calculating the preferred replica imbalance count requires iterating through 
> all the partitions in the cluster to check if a partition has a leader other 
> than the preferred leader. In a large cluster, the number of partitions can 
> be quite large, all seen by the controller. Even if the time spent to check a 
> single partition is small, the accumulation effect of so many partitions in 
> the cluster can make the invocation to update metrics quite expensive. One 
> might argue that maybe the logic for processing each single partition is not 
> optimized, we checked the CPU percentage of leaf nodes in the profiling 
> result, and found that inside the loops of collection objects, e.g. the set 
> of all partitions, no single function dominates the processing. Hence the 
> large number of the partitions in a cluster is the main contributor to the 
> slowness of one invocation to update the metrics.
> 2. The invocation to update metrics is called many times when the is a high 
> number of events to be processed by the controller, one invocation after 
> processing any event.
> The patch that will be submitted tries to fix bullet 2 above, i.e. reducing 
> the number of invocations to update metrics. Instead of updating the metrics 
> after processing any event, we only periodically check if the metrics needs 
> to be updated, i.e. once every second. 
> * If after the previous invocation to update metrics, there are other types 
> of events that changed the controller’s state, then one second later the 
> metrics will be updated. 
> * If after the previous invocation, there has been no other types of events, 
> then the call to update metrics can be bypassed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6652) The controller should log failed attempts to transition a replica to OfflineReplica state if there is no leadership info

2018-04-04 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang resolved KAFKA-6652.
---
Resolution: Won't Fix

> The controller should log failed attempts to transition a replica to 
> OfflineReplica state if there is no leadership info
> 
>
> Key: KAFKA-6652
> URL: https://issues.apache.org/jira/browse/KAFKA-6652
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
>
> In certain conditions, the controller's attempt to transition a replica to 
> OfflineReplica state could fail because there is no leadership info, e.g. the 
> condition described in 
> [KAFKA-6650|https://issues.apache.org/jira/browse/KAFKA-6650]. When that 
> happens, there should be logs to indicate the failed state transitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6612) Added logic to prevent increasing partition counts during topic deletion

2018-03-29 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang resolved KAFKA-6612.
---
Resolution: Fixed

> Added logic to prevent increasing partition counts during topic deletion
> 
>
> Key: KAFKA-6612
> URL: https://issues.apache.org/jira/browse/KAFKA-6612
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Major
>
> Problem: trying to increase the partition count of a topic while the topic 
> deletion is in progress can cause the topic to be never deleted.
> In the current code base, if a topic deletion is still in progress and the 
> partition count is increased,
> the new partition and its replica assignment be created on zookeeper as data 
> of the path /brokers/topics/.
> Upon detecting the change, the controller sees the topic is being deleted, 
> and therefore ignores the partition change. Therefore the zk path 
> /brokers/topics//partitions/ will NOT be created.
> If a controller switch happens next, the added partition will be detected by 
> the new controller and stored in the 
> controllerContext.partitionReplicaAssignment. The new controller then tries 
> to delete the topic by first transitioning its replicas to OfflineReplica. 
> However the transition to OfflineReplica state will NOT succeed since there 
> is no leader for the partition. Since the only state change path for a 
> replica to be successfully deleted is OfflineReplica -> 
> ReplicaDeletionStarted -> ReplicaDeletionSuccessful, not being able to enter 
> the OfflineReplica state means the replica can never be successfully deleted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6652) The controller should log failed attempts to transition a replica to OfflineReplica state if there is no leadership info

2018-03-23 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6652:
--
Description: In certain conditions, the controller's attempt to transition 
a replica to OfflineReplica state could fail because there is no leadership 
info, e.g. the condition described in 
[KAFKA-6650|https://issues.apache.org/jira/browse/KAFKA-6650]. When that 
happens, there should be logs to indicate the failed state transitions.  (was: 
In certain conditions, the controller's attempt to transition a replica to 
OfflineReplica state could fail, e.g. the condition described in 
[KAFKA-6650|https://issues.apache.org/jira/browse/KAFKA-6650]. When that 
happens, there should be logs to indicate the failed state transitions.)

> The controller should log failed attempts to transition a replica to 
> OfflineReplica state if there is no leadership info
> 
>
> Key: KAFKA-6652
> URL: https://issues.apache.org/jira/browse/KAFKA-6652
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
>
> In certain conditions, the controller's attempt to transition a replica to 
> OfflineReplica state could fail because there is no leadership info, e.g. the 
> condition described in 
> [KAFKA-6650|https://issues.apache.org/jira/browse/KAFKA-6650]. When that 
> happens, there should be logs to indicate the failed state transitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6652) The controller should log failed attempts to transition a replica to OfflineReplica state if there is no leadership info

2018-03-23 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6652:
--
Summary: The controller should log failed attempts to transition a replica 
to OfflineReplica state if there is no leadership info  (was: The controller 
should log failed attempts to transition a replica to OfflineReplica state)

> The controller should log failed attempts to transition a replica to 
> OfflineReplica state if there is no leadership info
> 
>
> Key: KAFKA-6652
> URL: https://issues.apache.org/jira/browse/KAFKA-6652
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
>
> In certain conditions, the controller's attempt to transition a replica to 
> OfflineReplica state could fail, e.g. the condition described in 
> [KAFKA-6650|https://issues.apache.org/jira/browse/KAFKA-6650]. When that 
> happens, there should be logs to indicate the failed state transitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6652) The controller should log failed attempts to transition a replica to OfflineReplica state

2018-03-13 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6652:
-

 Summary: The controller should log failed attempts to transition a 
replica to OfflineReplica state
 Key: KAFKA-6652
 URL: https://issues.apache.org/jira/browse/KAFKA-6652
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang
Assignee: Lucas Wang


In certain conditions, the controller's attempt to transition a replica to 
OfflineReplica state could fail, e.g. the condition described in 
[KAFKA-6650|https://issues.apache.org/jira/browse/KAFKA-6650]. When that 
happens, there should be logs to indicate the failed state transitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6650) The controller should be able to handle a partially deleted topic

2018-03-13 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6650:
-

 Summary: The controller should be able to handle a partially 
deleted topic
 Key: KAFKA-6650
 URL: https://issues.apache.org/jira/browse/KAFKA-6650
 Project: Kafka
  Issue Type: Bug
Reporter: Lucas Wang
Assignee: Lucas Wang


A previous controller could have deleted some partitions of a topic from ZK, 
but not all partitions, and then died.
In that case, the new controller should be able to handle the partially deleted 
topic, and finish the deletion.

In the current code base, if there is no leadership info for a replica's 
partition, the transition to OfflineReplica state for the replica will fail. 
Afterwards the transition to ReplicaDeletionStarted will fail as well since the 
only valid previous state for ReplicaDeletionStarted is OfflineReplica. 
Furthermore, it means the topic deletion will never finish.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6630) Speed up the processing of TopicDeletionStopReplicaResponseReceived events on the controller

2018-03-08 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6630:
--
Summary: Speed up the processing of 
TopicDeletionStopReplicaResponseReceived events on the controller  (was: Speed 
up the processing of StopReplicaResponse events on the controller)

> Speed up the processing of TopicDeletionStopReplicaResponseReceived events on 
> the controller
> 
>
> Key: KAFKA-6630
> URL: https://issues.apache.org/jira/browse/KAFKA-6630
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
>
> Problem Statement:
> We find in a large cluster with many partition replicas, it takes a long time 
> to successfully delete a topic. 
> Root cause:
> Further analysis shows that for a topic with N replicas, the controller 
> receives all the N StopReplicaResponses from brokers within a short time, 
> however sequentially handling all the N 
> TopicDeletionStopReplicaResponseReceived events one by one takes a long time.
> Specifically the functions triggered while handling every single 
> TopicDeletionStopReplicaResponseReceived event include:
> TopicDeletionStopReplicaResponseReceived.process calls 
> TopicDeletionManager.completeReplicaDeletion, which calls 
> TopicDeletionManager.resumeDeletions, which calls several inefficient 
> functions.
> The inefficient functions called inside TopicDeletionManager.resumeDeletions 
> include
> ReplicaStateMachine.areAllReplicasForTopicDeleted
> ReplicaStateMachine.isAtLeastOneReplicaInDeletionStartedState
> ReplicaStateMachine.replicasInState
> Each of the 3 inefficient functions above will iterate through all the 
> replicas in the cluster, and filter out the replicas belonging to a topic. In 
> a large cluster with many replicas, these functions can be quite slow. 
> Total deletion time for a topic becomes long in single threaded controller 
> processing model:
> Since the controller needs to sequentially process the queued 
> TopicDeletionStopReplicaResponseReceived events, if the time cost to process 
> one event is t, the total time to process all events for all replicas of a 
> topic is N * t.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6630) Speed up the processing of StopReplicaResponse events on the controller

2018-03-08 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6630:
-

 Summary: Speed up the processing of StopReplicaResponse events on 
the controller
 Key: KAFKA-6630
 URL: https://issues.apache.org/jira/browse/KAFKA-6630
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Lucas Wang
Assignee: Lucas Wang


Problem Statement:
We find in a large cluster with many partition replicas, it takes a long time 
to successfully delete a topic. 

Root cause:
Further analysis shows that for a topic with N replicas, the controller 
receives all the N StopReplicaResponses from brokers within a short time, 
however sequentially handling all the N 
TopicDeletionStopReplicaResponseReceived events one by one takes a long time.

Specifically the functions triggered while handling every single 
TopicDeletionStopReplicaResponseReceived event include:
TopicDeletionStopReplicaResponseReceived.process calls 
TopicDeletionManager.completeReplicaDeletion, which calls 
TopicDeletionManager.resumeDeletions, which calls several inefficient functions.

The inefficient functions called inside TopicDeletionManager.resumeDeletions 
include
ReplicaStateMachine.areAllReplicasForTopicDeleted
ReplicaStateMachine.isAtLeastOneReplicaInDeletionStartedState
ReplicaStateMachine.replicasInState

Each of the 3 inefficient functions above will iterate through all the replicas 
in the cluster, and filter out the replicas belonging to a topic. In a large 
cluster with many replicas, these functions can be quite slow. 

Total deletion time for a topic becomes long in single threaded controller 
processing model:
Since the controller needs to sequentially process the queued 
TopicDeletionStopReplicaResponseReceived events, if the time cost to process 
one event is t, the total time to process all events for all replicas of a 
topic is N * t.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6612) Added logic to prevent increasing partition counts during topic deletion

2018-03-05 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang reassigned KAFKA-6612:
-

Assignee: Lucas Wang

> Added logic to prevent increasing partition counts during topic deletion
> 
>
> Key: KAFKA-6612
> URL: https://issues.apache.org/jira/browse/KAFKA-6612
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Major
>
> Problem: trying to increase the partition count of a topic while the topic 
> deletion is in progress can cause the topic to be never deleted.
> In the current code base, if a topic deletion is still in progress and the 
> partition count is increased,
> the new partition and its replica assignment be created on zookeeper as data 
> of the path /brokers/topics/.
> Upon detecting the change, the controller sees the topic is being deleted, 
> and therefore ignores the partition change. Therefore the zk path 
> /brokers/topics//partitions/ will NOT be created.
> If a controller switch happens next, the added partition will be detected by 
> the new controller and stored in the 
> controllerContext.partitionReplicaAssignment. The new controller then tries 
> to delete the topic by first transitioning its replicas to OfflineReplica. 
> However the transition to OfflineReplica state will NOT succeed since there 
> is no leader for the partition. Since the only state change path for a 
> replica to be successfully deleted is OfflineReplica -> 
> ReplicaDeletionStarted -> ReplicaDeletionSuccessful, not being able to enter 
> the OfflineReplica state means the replica can never be successfully deleted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6612) Added logic to prevent increasing partition counts during topic deletion

2018-03-05 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6612:
-

 Summary: Added logic to prevent increasing partition counts during 
topic deletion
 Key: KAFKA-6612
 URL: https://issues.apache.org/jira/browse/KAFKA-6612
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


Problem: trying to increase the partition count of a topic while the topic 
deletion is in progress can cause the topic to be never deleted.

In the current code base, if a topic deletion is still in progress and the 
partition count is increased,
the new partition and its replica assignment be created on zookeeper as data of 
the path /brokers/topics/.
Upon detecting the change, the controller sees the topic is being deleted, and 
therefore ignores the partition change. Therefore the zk path 
/brokers/topics//partitions/ will NOT be created.

If a controller switch happens next, the added partition will be detected by 
the new controller and stored in the 
controllerContext.partitionReplicaAssignment. The new controller then tries to 
delete the topic by first transitioning its replicas to OfflineReplica. However 
the transition to OfflineReplica state will NOT succeed since there is no 
leader for the partition. Since the only state change path for a replica to be 
successfully deleted is OfflineReplica -> ReplicaDeletionStarted -> 
ReplicaDeletionSuccessful, not being able to enter the OfflineReplica state 
means the replica can never be successfully deleted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-08 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the 
controller.log file, one line for each partition in topic a0: "Leader not yet 
assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is 
because we have the following stack trace: 
addUpdateMetadataRequestForBrokers
addLeaderAndIsrRequestForBrokers
_inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions

In general, if n partitions have already been accumulated in the 
partitionsToBeDeleted variable, and a new topic is created with m partitions, m 
* n log messages above will be generated.

 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 8. Verify that the following log message in step 4 appears another *210* 
times. This is because
a. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
b. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls 
sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
generates 10 logs, one for each partition in the a0 topic.

In general, when we have n partitions accumulated in the variable 
partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 
* m * n + n logs could be generated.

Performance improvement benchmark: if we perform the steps above with topic a0 
having 5000 partitions, and topic a1 having 5000 partitions, when broker 2 goes 
down, it takes the controller ~4 minutes to go through controlled shutdown, 
detect the broker failure through zookeeper, and transition all replicas to 
OfflineReplica state. After applying the patch, the same process takes 23 
seconds.

The testing done:
After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
Also I've v

[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-08 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the 
controller.log file, one line for each partition in topic a0: "Leader not yet 
assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is 
because we have the following stack trace: 
addUpdateMetadataRequestForBrokers
addLeaderAndIsrRequestForBrokers
_inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions

In general, if n partitions have already been accumulated in the 
partitionsToBeDeleted variable, and a new topic is created with m partitions, m 
* n log messages above will be generated.

 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 8. Verify that the following log message in step 4 appears another *210* 
times. This is because
a. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
b. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls 
sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
generates 10 logs, one for each partition in the a0 topic.

In general, when we have n partitions accumulated in the variable 
partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 
* m * n + n logs could be generated.

Performance improvement benchmark: if we perform the steps above with topic a0 
having 5000 partitions, and topic a1 having 5000 partitions, when broker 2 goes 
down, it takes the controller ~4 minutes for to controller to go through 
controlled shutdown, detect the broker failure through zookeeper, and 
transition all replicas to OfflineReplica state. After applying the patch, the 
same process takes 23 seconds.

The testing done:
After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
parti

[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-08 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the 
controller.log file, one line for each partition in topic a0: "Leader not yet 
assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is 
because we have the following stack trace: 
addUpdateMetadataRequestForBrokers
addLeaderAndIsrRequestForBrokers
_inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions

In general, if n partitions have already been accumulated in the 
partitionsToBeDeleted variable, and a new topic is created with m partitions, m 
* n log messages above will be generated.

 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 8. Verify that the following log message in step 4 appears another *210* 
times. This is because
a. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
b. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls 
sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
generates 10 logs, one for each partition in the a0 topic.

In general, when we have n partitions accumulated in the variable 
partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 
* m * n + n logs could be generated.

Performance improvement benchmark: if we perform the steps above with topic a0 
having 5000 partitions, and topic a1 having 5000 partitions, when broker 2 goes 
down, it takes the controller ~4 minutes for to controller to go through 
controlled shutdown, detect the broker failure through zookeeper, and 
transition all replicas to OfflineReplica state. After applying the patch, the 
same process takes 23 seconds.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
 Also I've 

[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-07 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the 
controller.log file, one line for each partition in topic a0: "Leader not yet 
assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is 
because we have the following stack trace: 
addUpdateMetadataRequestForBrokers
addLeaderAndIsrRequestForBrokers
_inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions

In general, if n partitions have already been accumulated in the 
partitionsToBeDeleted variable, and a new topic is created with m partitions, m 
* n log messages above will be generated.

 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 8. Verify that the following log message in step 4 appears another *210* 
times. This is because
a. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
b. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls 
sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
generates 10 logs, one for each partition in the a0 topic.

In general, when we have n partitions accumulated in the variable 
partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 2 
* m * n + n logs could be generated.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
 Also I've verified that topic deletion for topic a1 still works fine.

  was:
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to

[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-07 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the 
controller.log file, one line for each partition in topic a0: "Leader not yet 
assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is 
because we have the following stack trace: 
addUpdateMetadataRequestForBrokers
addLeaderAndIsrRequestForBrokers
_inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions

In general, if n partitions have already been accumulated in the 
partitionsToBeDeleted variable, and a new topic is created with m partitions, m 
* n log messages above will be generated.

 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 8. Verify that the following log message in step 4 appears another *210* 
times. This is because
a. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
b. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls 
sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
generates 10 logs, one for each partition in the a0 topic.

In general, when we have n partitions accumulated in the variable 
partitionsToBeDeleted, and a broker with m partitions becomes offline, 2 * m * 
n + n logs will be generated.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
 Also I've verified that topic deletion for topic a1 still works fine.

  was:
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-del

[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-07 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the 
controller.log file, one line for each partition in topic a0: "Leader not yet 
assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is 
because we have the following stack trace: 
addUpdateMetadataRequestForBrokers
addLeaderAndIsrRequestForBrokers
_inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions


 5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 6. Verify that the following log message in step 4 appears another *210* 
times. This is because
a. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
b. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls 
sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
generates 10 logs, one for each partition in the a0 topic.

According to the analysis in step 6, when we have n partitions accumulated in 
the variable partitionsToBeDeleted, and a broker with m partitions becomes 
offline, 2 * m * n + n logs in step 4 will be generated.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
 Also I've verified that topic deletion for topic a1 still works fine.

  was:
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead

[jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-02-07 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang updated KAFKA-6481:
--
Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     inside a for-loop for each partition 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0
 4. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`
 5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
 6. Verify that the following log message appear over 200 times in the 
controller.log file, one for each iteration of the a0 partitions
 "Leader not yet assigned for partition [a0,..]. Skip sending 
UpdateMetadataRequest."

What happened was 
 1. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
 2. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) entries of the logs above.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
 Also I've verified that topic deletion for topic a1 still works fine.

  was:
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
    ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
    C

[jira] [Assigned] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-01-24 Thread Lucas Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Wang reassigned KAFKA-6481:
-

Assignee: Lucas Wang

> Improving performance of the function 
> ControllerChannelManager.addUpdateMetadataRequestForBrokers
> -
>
> Key: KAFKA-6481
> URL: https://issues.apache.org/jira/browse/KAFKA-6481
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Minor
>
> The function ControllerChannelManager.addUpdateMetadataRequestForBrokers 
> should only process the partitions specified in the partitions parameter, 
> i.e. the 2nd parameter, and avoid iterating through the set of partitions in 
> TopicDeletionManager.partitionsToBeDeleted.
>  
> Here is why the current code can be a problem:
> The number of partitions-to-be-deleted stored in the field 
> TopicDeletionManager.partitionsToBeDeleted can become quite large under 
> certain scenarios. For instance, if a topic a0 has dead replicas, the topic 
> a0 would be marked as ineligible for deletion, and its partitions will be 
> retained in the field TopicDeletionManager.partitionsToBeDeleted for future 
> retries.
> With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
> if some replicas in another topic a1 needs to be transitioned to 
> OfflineReplica state, possibly because of a broker going offline, a call 
> stack listed as following will happen on the controller, causing a iteration 
> of the whole partitions-to-be-deleted set for every single affected partition.
>     controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition 
> => updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
>     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
>     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
>     inside a for-loop for each partition 
> ReplicaStateMachine.doHandleStateChanges
> ReplicaStateMachine.handleStateChanges
> KafkaController.onReplicasBecomeOffline
> KafkaController.onBrokerFailure
> How to reproduce the problem:
> 1. Cretae a cluster with 2 brokers having id 1 and 2
> 2. Create a topic having 10 partitions and deliberately assign the replicas 
> to non-existing brokers, i.e. 
>  ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
> --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`
> 3. Delete the topic and cause all of its partitions to be retained in the 
> field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead 
> replicas, and is ineligible for deletion.
> 4. Create another topic a1 also having 10 partitions, i.e.
>  ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
> --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`
> 5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
> OfflineReplica state on the controller.
> 6. Verify that the following log message appear over 200 times in the 
> controller.log file, one for each iteration of the a0 partitions
>  "Leader not yet assigned for partition [a0,..]. Skip sending 
> UpdateMetadataRequest."
>  
>  What happened was 
>  1. During controlled shutdown, the function 
> KafkaController.doControlledShutdown calls 
> replicaStateMachine.handleStateChanges to transition all the replicas on 
> broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the 
> logs above.
>  2. When the broker zNode is gone in ZK, the function 
> KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
> to transition all the replicas on broker 2 to OfflineState. And this again 
> generates 100 (10 x 10) entries of the logs above.
> After applying the patch in this RB, I've verified that by going through the 
> steps above, broker 2 going offline NO LONGER generates log entries for the 
> a0 partitions.
> Also I've verified that topic deletion for topic a1 still works fine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers

2018-01-24 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6481:
-

 Summary: Improving performance of the function 
ControllerChannelManager.addUpdateMetadataRequestForBrokers
 Key: KAFKA-6481
 URL: https://issues.apache.org/jira/browse/KAFKA-6481
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang


The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should 
only process the partitions specified in the partitions parameter, i.e. the 2nd 
parameter, and avoid iterating through the set of partitions in 
TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field 
TopicDeletionManager.partitionsToBeDeleted can become quite large under certain 
scenarios. For instance, if a topic a0 has dead replicas, the topic a0 would be 
marked as ineligible for deletion, and its partitions will be retained in the 
field TopicDeletionManager.partitionsToBeDeleted for future retries.
With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, 
if some replicas in another topic a1 needs to be transitioned to OfflineReplica 
state, possibly because of a broker going offline, a call stack listed as 
following will happen on the controller, causing a iteration of the whole 
partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
    ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
    ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
    inside a for-loop for each partition 
ReplicaStateMachine.doHandleStateChanges
ReplicaStateMachine.handleStateChanges
KafkaController.onReplicasBecomeOffline
KafkaController.onBrokerFailure


How to reproduce the problem:
1. Cretae a cluster with 2 brokers having id 1 and 2
2. Create a topic having 10 partitions and deliberately assign the replicas to 
non-existing brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
--replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field 
TopicDeletionManager.partitionsToBeDeleted, since the topic has dead replicas, 
and is ineligible for deletion.
4. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
--replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`
5. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to 
OfflineReplica state on the controller.
6. Verify that the following log message appear over 200 times in the 
controller.log file, one for each iteration of the a0 partitions
 "Leader not yet assigned for partition [a0,..]. Skip sending 
UpdateMetadataRequest."
 
 What happened was 
 1. During controlled shutdown, the function 
KafkaController.doControlledShutdown calls 
replicaStateMachine.handleStateChanges to transition all the replicas on broker 
2 to OfflineState. That in turn generates 100 (10 x 10) entries of the logs 
above.
 2. When the broker zNode is gone in ZK, the function 
KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
to transition all the replicas on broker 2 to OfflineState. And this again 
generates 100 (10 x 10) entries of the logs above.

After applying the patch in this RB, I've verified that by going through the 
steps above, broker 2 going offline NO LONGER generates log entries for the a0 
partitions.
Also I've verified that topic deletion for topic a1 still works fine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)