This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 008452b32db9b3c258944587b8d3ddd86612ea2a Author: Ali Ahmed <[email protected]> AuthorDate: Wed Oct 20 13:16:24 2021 -0700 Add log error tracking for semaphore count leak (#12410) Co-authored-by: Ali Ahmed <[email protected]> (cherry picked from commit 7c219b11966d4eb8cc20111468c3439d23f8777c) --- .../apache/pulsar/client/impl/ProducerImpl.java | 31 +++++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 31a32da..f5165d7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -151,6 +151,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne private Optional<Long> topicEpoch = Optional.empty(); private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>(); + private boolean errorState; + @SuppressWarnings("rawtypes") private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater .newUpdater(ProducerImpl.class, "msgIdGenerator"); @@ -261,6 +263,21 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne grabCnx(); } + protected void semaphoreRelease(final int releaseCountRequest) { + if (semaphore.isPresent()) { + if (!errorState) { + final int availablePermits = semaphore.get().availablePermits(); + if (availablePermits - releaseCountRequest < 0) { + log.error("Semaphore permit release count request greater then availablePermits" + + " : availablePermits={}, releaseCountRequest={}", + availablePermits, releaseCountRequest); + errorState = true; + } + } + semaphore.get().release(releaseCountRequest); + } + } + protected OpSendMsgQueue createPendingMessagesQueue() { return new OpSendMsgQueue(); } @@ -1022,9 +1039,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } private void releaseSemaphoreForSendOp(OpSendMsg op) { - if (semaphore.isPresent()) { - semaphore.get().release(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1); - } + + semaphoreRelease(isBatchMessagingEnabled() ? op.numMessagesInBatch : 1); + client.getMemoryLimitController().releaseMemory(op.uncompressedSize); } @@ -1778,7 +1795,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne }); pendingMessages.clear(); - semaphore.ifPresent(s -> s.release(releaseCount.get())); + semaphoreRelease(releaseCount.get()); if (batchMessagingEnabled) { failPendingBatchMessages(ex); } @@ -1804,7 +1821,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } final int numMessagesInBatch = batchMessageContainer.getNumMessagesInBatch(); batchMessageContainer.discard(ex); - semaphore.ifPresent(s -> s.release(numMessagesInBatch)); + semaphoreRelease(numMessagesInBatch); } @Override @@ -1847,9 +1864,9 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne processOpSendMsg(opSendMsg); } } catch (PulsarClientException e) { - semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch())); + semaphoreRelease(batchMessageContainer.getNumMessagesInBatch()); } catch (Throwable t) { - semaphore.ifPresent(s -> s.release(batchMessageContainer.getNumMessagesInBatch())); + semaphoreRelease(batchMessageContainer.getNumMessagesInBatch()); log.warn("[{}] [{}] error while create opSendMsg by batch message container", topic, producerName, t); } }
