This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e78d9f1ac54 PIP-105: Fix error on recycled SubscriptionPropertiesList
(#15335)
e78d9f1ac54 is described below
commit e78d9f1ac546c150f4068c148e5ffe95c2ddf1f9
Author: Enrico Olivelli <[email protected]>
AuthorDate: Wed Apr 27 09:47:54 2022 +0200
PIP-105: Fix error on recycled SubscriptionPropertiesList (#15335)
Sometimes the CommandSubscribe object has already been released and it
triggers this error:
17:36:40.676 [bookkeeper-ml-scheduler-OrderedScheduler-3-0] WARN
org.apache.pulsar.broker.service.ServerCnx -
[/192.168.1.111:50688][persistent://public/default/test-cb4105f6-f850-4bdf-9e79-66d4ac42658c][13b9ee68-4ee4-4845-b955-77420b8b6a29]
Failed to create consumer: consumerId=0, refCnt: 0
java.util.concurrent.CompletionException:
io.netty.util.IllegalReferenceCountException: refCnt: 0
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
~[?:?]
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
~[?:?]
at
java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java:1081)
~[?:?]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
~[?:?]
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
~[?:?]
at
org.apache.pulsar.broker.service.BrokerService.lambda(BrokerService.java:1419)
~[pulsar-broker-2.10.0.jar:2.10.0]
at
java.util.concurrent.CompletableFuture.uniRunNow(CompletableFuture.java:815)
~[?:?]
at
java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:799)
~[?:?]
at
java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2121)
~[?:?]
at
org.apache.pulsar.broker.service.BrokerService.openLedgerComplete(BrokerService.java:1405)
~[pulsar-broker-2.10.0.jar:2.10.0]
at
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.lambda(ManagedLedgerFactoryImpl.java:425)
~[managed-ledger-2.10.0.jar:2.10.0]
at
java.util.concurrent.CompletableFuture.tryFire2168(CompletableFuture.java:714)
~[?:?]
at
java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java) ~[?:?]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
~[?:?]
at
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
~[?:?]
at
org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl.initializeComplete(ManagedLedgerFactoryImpl.java:392)
~[managed-ledger-2.10.0.jar:2.10.0]
at
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.operationComplete(ManagedLedgerImpl.java:525)
~[managed-ledger-2.10.0.jar:2.10.0]
at
org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.operationComplete(ManagedLedgerImpl.java:515)
~[managed-ledger-2.10.0.jar:2.10.0]
at
org.apache.bookkeeper.mledger.impl.MetaStoreImpl.lambda(MetaStoreImpl.java:167)
~[managed-ledger-2.10.0.jar:2.10.0]
at
java.util.concurrent.CompletableFuture.tryFire2168(CompletableFuture.java:714)
[?:?]
at
java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java) [?:?]
at
java.util.concurrent.CompletableFuture.run(CompletableFuture.java:478) [?:?]
at
org.apache.bookkeeper.common.util.OrderedExecutor.run(OrderedExecutor.java:203)
[bookkeeper-common-4.14.4.jar:4.14.4]
at java.util.concurrent.Executors.call(Executors.java:515) [?:?]
at java.util.concurrent.FutureTask.run2168(FutureTask.java:264)
[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java) [?:?]
at
java.util.concurrent.ScheduledThreadPoolExecutor.run(ScheduledThreadPoolExecutor.java:304)
[?:?]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
[?:?]
at
java.util.concurrent.ThreadPoolExecutor.run(ThreadPoolExecutor.java:628) [?:?]
at
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
[netty-common-4.1.74.Final.jar:4.1.74.Final]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0
at
io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1383)
~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.buffer.UnsafeByteBufUtil.getBytes(UnsafeByteBufUtil.java:481)
~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.buffer.PooledUnsafeDirectByteBuf.getBytes(PooledUnsafeDirectByteBuf.java:130)
~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.buffer.PooledSlicedByteBuf.getBytes(PooledSlicedByteBuf.java:235)
~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at io.netty.buffer.ByteBufUtil.decodeString(ByteBufUtil.java:1270)
~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at
io.netty.buffer.AbstractByteBuf.toString(AbstractByteBuf.java:1246)
~[netty-buffer-4.1.74.Final.jar:4.1.74.Final]
at
org.apache.pulsar.common.api.proto.LightProtoCodec.readString(LightProtoCodec.java:250)
~[pulsar-common-2.10.0.jar:2.10.0]
at
org.apache.pulsar.common.api.proto.KeyValue.getKey(KeyValue.java:19)
~[pulsar-common-2.10.0.jar:2.10.0]
at java.util.stream.Collectors.lambda(Collectors.java:1658) ~[?:?]
at
java.util.stream.ReduceOpsReducingSink.accept(ReduceOps.java:169) ~[?:?]
at java.util.ArrayList.forEachRemaining(ArrayList.java:1511) ~[?:?]
at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
~[?:?]
at
java.util.stream.ReduceOps.evaluateSequential(ReduceOps.java:913) ~[?:?]
at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
at
org.apache.pulsar.broker.service.SubscriptionOption.getPropertiesMap(SubscriptionOption.java:57)
~[pulsar-broker-2.10.0.jar:2.10.0]
at
org.apache.pulsar.broker.service.ServerCnx.lambda(ServerCnx.java:1047)
~[pulsar-broker-2.10.0.jar:2.10.0]
at
java.util.concurrent.CompletableFuture.tryFire(CompletableFuture.java:1072)
~[?:?]
... 28 more
---
.../java/org/apache/pulsar/broker/service/ServerCnx.java | 16 ++++++++++------
1 file changed, 10 insertions(+), 6 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 00246e40f7d..0963e181f35 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1033,6 +1033,14 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
boolean createTopicIfDoesNotExist = forceTopicCreation
&&
service.isAllowAutoTopicCreation(topicName.toString());
+ final long consumerEpoch;
+ if (subscribe.hasConsumerEpoch()) {
+ consumerEpoch = subscribe.getConsumerEpoch();
+ } else {
+ consumerEpoch = DEFAULT_CONSUMER_EPOCH;
+ }
+ Optional<Map<String, String>> subscriptionProperties =
SubscriptionOption.getPropertiesMap(
+ subscribe.getSubscriptionPropertiesList());
service.getTopic(topicName.toString(),
createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
if (!optTopic.isPresent()) {
@@ -1054,10 +1062,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
new
SubscriptionNotFoundException(
"Subscription does not
exist"));
}
- long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
- if (subscribe.hasConsumerEpoch()) {
- consumerEpoch = subscribe.getConsumerEpoch();
- }
+
SubscriptionOption option =
SubscriptionOption.builder().cnx(ServerCnx.this)
.subscriptionName(subscriptionName)
.consumerId(consumerId).subType(subType).priorityLevel(priorityLevel)
@@ -1066,8 +1071,7 @@ public class ServerCnx extends PulsarHandler implements
TransportCnx {
.initialPosition(initialPosition)
.startMessageRollbackDurationSec(startMessageRollbackDurationSec)
.replicatedSubscriptionStateArg(isReplicated).keySharedMeta(keySharedMeta)
-
.subscriptionProperties(SubscriptionOption.getPropertiesMap(
-
subscribe.getSubscriptionPropertiesList()))
+
.subscriptionProperties(subscriptionProperties)
.consumerEpoch(consumerEpoch)
.build();
if (schema != null) {