This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push: new 4bdaa32bc7c [improve][broker] Reduce the consumers list sort by priority level (#16243) 4bdaa32bc7c is described below commit 4bdaa32bc7c3ccfef5fd6588286f0ebfc66f7dc7 Author: lipenghui <peng...@apache.org> AuthorDate: Tue Jun 28 09:11:47 2022 +0800 [improve][broker] Reduce the consumers list sort by priority level (#16243) ### Motivation While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will affect the message publish and subsequent consumer creation. ``` "pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry [0x0000700019642000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207) - waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription) at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206) at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684) at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662) at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168) at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.3/Thread.java:833) ``` ``` "pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable [0x00007000171d5000] java.lang.Thread.State: RUNNABLE at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360) at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234) at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307) at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896) at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888) - locked <0x00001000158237d8> (a java.lang.Object) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159) - locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287) - locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription) at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206) at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684) at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662) at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168) at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.3/Thread.java:833) ``` ### Modification - Sort the consumer list only if the new consumer with high priority than the last element in the consumer list, this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent). --- .../service/persistent/PersistentDispatcherMultipleConsumers.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 7e1bbc0317a..0a6cf8e02a9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -154,7 +154,10 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul } consumerList.add(consumer); - consumerList.sort(Comparator.comparingInt(Consumer::getPriorityLevel)); + if (consumerList.size() > 1 + && consumer.getPriorityLevel() < consumerList.get(consumerList.size() - 2).getPriorityLevel()) { + consumerList.sort(Comparator.comparingInt(Consumer::getPriorityLevel)); + } consumerSet.add(consumer); }