This is an automated email from the ASF dual-hosted git repository. chenhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 1399eeb Forget to update memory usage on message send timeout (#11761) 1399eeb is described below commit 1399eeb5ebdee7c96d9227fe037aae69326e334c Author: Shoothzj <shoot...@gmail.com> AuthorDate: Wed Aug 25 15:43:57 2021 +0800 Forget to update memory usage on message send timeout (#11761) ### Motivation The producer doesn't release the meomry when msg send timeout ### Modifications When the message timeout, release the memory too. ### Documentation We don't need to update docs, because it's a bug fix. ### reproduce the problem - start pulsar server - start pulsar producer, which will send msg for 10 minutes(queueSize 0, memory limit 50MB) - close the pulsar server - wait for 10 minutes over. - Look the heapdump, it shows pendingMessages size is 0, but Memory used 52429824 --- .../client/impl/ProducerMemoryLimitTest.java | 79 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ProducerImpl.java | 1 + 2 files changed, 80 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java new file mode 100644 index 0000000..b6ec6a5 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLimitTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import lombok.Cleanup; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SizeUnit; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +@Test(groups = "broker-impl") +public class ProducerMemoryLimitTest extends ProducerConsumerBase { + + @Override + @BeforeMethod + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @Override + @AfterMethod(alwaysRun = true) + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 10_000) + public void testProducerTimeoutMemoryRelease() throws Exception { + initClientWithMemoryLimit(); + @Cleanup + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer() + .topic("testProducerMemoryLimit") + .sendTimeout(5, TimeUnit.SECONDS) + .maxPendingMessages(0) + .enableBatching(false) + .create(); + this.stopBroker(); + try { + producer.send("memroy-test".getBytes(StandardCharsets.UTF_8)); + throw new IllegalStateException("can not reach here"); + } catch (PulsarClientException.TimeoutException ex) { + PulsarClientImpl clientImpl = (PulsarClientImpl) this.pulsarClient; + final MemoryLimitController memoryLimitController = clientImpl.getMemoryLimitController(); + Assert.assertEquals(memoryLimitController.currentUsage(), 0); + } + + } + + private void initClientWithMemoryLimit() throws PulsarClientException { + pulsarClient = PulsarClient.builder(). + serviceUrl(lookupUrl.toString()) + .memoryLimit(50, SizeUnit.KILO_BYTES) + .build(); + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index f4f6c3c..439ceff 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1649,6 +1649,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topic, producerName, op.sequenceId, t); } + client.getMemoryLimitController().releaseMemory(op.uncompressedSize); ReferenceCountUtil.safeRelease(op.cmd); op.recycle(); });