This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d6fedd5df00967efdfa9d330b08ae1e73762bfb4 Author: fengyubiao <[email protected]> AuthorDate: Fri Dec 20 10:59:04 2024 +0800 [fix] [client] Fix memory leak when publishing encountered a corner case error (#23738) Co-authored-by: Yunze Xu <[email protected]> (cherry picked from commit ab1b5c00565adfe877719130127fc23ac9c5a0c1) --- .../pulsar/client/impl/ProducerMemoryLeakTest.java | 364 +++++++++++++++++++++ .../client/impl/BatchMessageContainerImpl.java | 4 +- .../apache/pulsar/client/impl/ProducerImpl.java | 54 ++- .../pulsar/client/impl/ProducerInterceptors.java | 12 +- 4 files changed, 417 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java new file mode 100644 index 00000000000..dcdfd136476 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerMemoryLeakTest.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mockStatic; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.interceptor.ProducerInterceptor; +import org.apache.pulsar.common.protocol.ByteBufPair; +import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; +import org.mockito.MockedStatic; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class ProducerMemoryLeakTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testSendQueueIsFull() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING) + .blockIfQueueFull(false).maxPendingMessages(1) + .enableBatching(true).topic(topicName).create(); + List<MsgPayloadTouchableMessageBuilder<String>> msgBuilderList = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + msgBuilderList.add(newMessage(producer)); + } + + CompletableFuture latestSendFuture = null; + for (MsgPayloadTouchableMessageBuilder<String> msgBuilder: msgBuilderList) { + latestSendFuture = msgBuilder.value("msg-1").sendAsync(); + } + try{ + latestSendFuture.join(); + } catch (Exception ex) { + // Ignore the error PulsarClientException$ProducerQueueIsFullError. + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.ProducerQueueIsFullError); + } + + // Verify: ref is expected. + producer.close(); + for (int i = 0; i < msgBuilderList.size(); i++) { + MsgPayloadTouchableMessageBuilder<String> msgBuilder = msgBuilderList.get(i); + assertEquals(msgBuilder.payload.refCnt(), 1); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + admin.topics().delete(topicName); + } + + /** + * The content size of msg(value is "msg-1") will be "5". + * Then provides two param: 1 and 5. + * 1: reach the limitation before adding the message metadata. + * 2: reach the limitation after adding the message metadata. + */ + @DataProvider(name = "maxMessageSizeAndCompressions") + public Object[][] maxMessageSizeAndCompressions(){ + return new Object[][] { + {1, CompressionType.NONE}, + {5, CompressionType.NONE}, + {1, CompressionType.LZ4}, + {6, CompressionType.LZ4} + }; + } + + @Test(dataProvider = "maxMessageSizeAndCompressions") + public void testSendMessageSizeExceeded(int maxMessageSize, CompressionType compressionType) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .compressionType(compressionType) + .enableBatching(false) + .create(); + producer.getConnectionHandler().setMaxMessageSize(maxMessageSize); + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + /** + * Mock an error: reached max message size, see more details {@link #maxMessageSizeAndCompressions()}. + */ + try (MockedStatic<ByteBufPair> theMock = mockStatic(ByteBufPair.class)) { + List<ByteBufPair> generatedByteBufPairs = Collections.synchronizedList(new ArrayList<>()); + theMock.when(() -> ByteBufPair.get(any(ByteBuf.class), any(ByteBuf.class))).then(invocation -> { + ByteBufPair byteBufPair = (ByteBufPair) invocation.callRealMethod(); + generatedByteBufPairs.add(byteBufPair); + byteBufPair.retain(); + return byteBufPair; + }); + try { + msgBuilder.value("msg-1").send(); + fail("expected an error that reached the max message size"); + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.InvalidMessageException); + } + + // Verify: message payload has been released. + // Since "MsgPayloadTouchableMessageBuilder" has called "buffer.retain" once, "refCnt()" should be "1". + producer.close(); + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + // Verify: ByteBufPair generated for Pulsar Command. + if (maxMessageSize == 1) { + assertEquals(generatedByteBufPairs.size(),0); + } else { + assertEquals(generatedByteBufPairs.size(),1); + if (compressionType == CompressionType.NONE) { + assertEquals(msgBuilder.payload.refCnt(), 2); + } else { + assertEquals(msgBuilder.payload.refCnt(), 1); + } + for (ByteBufPair byteBufPair : generatedByteBufPairs) { + assertEquals(byteBufPair.refCnt(), 1); + byteBufPair.release(); + assertEquals(byteBufPair.refCnt(), 0); + } + } + // Verify: message.payload + assertEquals(msgBuilder.payload.refCnt(), 1); + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + } + + // cleanup. + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + /** + * The content size of msg(value is "msg-1") will be "5". + * Then provides two param: 1 and 5. + * 1: Less than the limitation when adding the message into the batch-container. + * 3: Less than the limitation when building batched messages payload. + * 2: Equals the limitation when building batched messages payload. + */ + @DataProvider(name = "maxMessageSizes") + public Object[][] maxMessageSizes(){ + return new Object[][] { + {1}, + {3}, + {26} + }; + } + + @Test(dataProvider = "maxMessageSizes") + public void testBatchedSendMessageSizeExceeded(int maxMessageSize) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .enableBatching(true) + .compressionType(CompressionType.NONE) + .create(); + final ClientCnx cnx = producer.getClientCnx(); + producer.getConnectionHandler().setMaxMessageSize(maxMessageSize); + MsgPayloadTouchableMessageBuilder<String> msgBuilder1 = newMessage(producer); + MsgPayloadTouchableMessageBuilder<String> msgBuilder2 = newMessage(producer); + /** + * Mock an error: reached max message size. see more detail {@link #maxMessageSizes()}. + */ + msgBuilder1.value("msg-1").sendAsync(); + try { + msgBuilder2.value("msg-1").send(); + if (maxMessageSize != 26) { + fail("expected an error that reached the max message size"); + } + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.InvalidMessageException); + } + + // Verify: message payload has been released. + // Since "MsgPayloadTouchableMessageBuilder" has called "buffer.retain" once, "refCnt()" should be "1". + producer.close(); + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder1.payload.refCnt(), 1); + assertEquals(msgBuilder2.payload.refCnt(), 1); + + // cleanup. + cnx.ctx().close(); + msgBuilder1.release(); + msgBuilder2.release(); + assertEquals(msgBuilder1.payload.refCnt(), 0); + assertEquals(msgBuilder2.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + @Test + public void testSendAfterClosedProducer() throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = + (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName).create(); + // Publish after the producer was closed. + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + producer.close(); + try { + msgBuilder.value("msg-1").send(); + fail("expected an error that the producer has closed"); + } catch (Exception ex) { + assertTrue(FutureUtil.unwrapCompletionException(ex) + instanceof PulsarClientException.AlreadyClosedException); + } + + // Verify: message payload has been released. + Awaitility.await().untilAsserted(() -> { + assertEquals(producer.getPendingQueueSize(), 0); + }); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + @DataProvider + public Object[][] failedInterceptAt() { + return new Object[][]{ + {"close"}, + {"eligible"}, + {"beforeSend"}, + {"onSendAcknowledgement"}, + }; + } + + @Test(dataProvider = "failedInterceptAt") + public void testInterceptorError(String method) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_"); + admin.topics().createNonPartitionedTopic(topicName); + ProducerImpl<String> producer = (ProducerImpl<String>) pulsarClient.newProducer(Schema.STRING).topic(topicName) + .intercept( + + new ProducerInterceptor() { + @Override + public void close() { + if (method.equals("close")) { + throw new RuntimeException("Mocked error"); + } + } + + @Override + public boolean eligible(Message message) { + if (method.equals("eligible")) { + throw new RuntimeException("Mocked error"); + } + return false; + } + + @Override + public Message beforeSend(Producer producer, Message message) { + if (method.equals("beforeSend")) { + throw new RuntimeException("Mocked error"); + } + return message; + } + + @Override + public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, + Throwable exception) { + if (method.equals("onSendAcknowledgement")) { + throw new RuntimeException("Mocked error"); + } + + } + }).create(); + + MsgPayloadTouchableMessageBuilder<String> msgBuilder = newMessage(producer); + try { + msgBuilder.value("msg-1").sendAsync().get(3, TimeUnit.SECONDS); + // It may throw error. + } catch (Exception ex) { + assertTrue(ex.getMessage().contains("Mocked")); + } + + // Verify: message payload has been released. + producer.close(); + assertEquals(msgBuilder.payload.refCnt(), 1); + + // cleanup. + msgBuilder.release(); + assertEquals(msgBuilder.payload.refCnt(), 0); + admin.topics().delete(topicName); + } + + private <T> MsgPayloadTouchableMessageBuilder<T> newMessage(ProducerImpl<T> producer){ + return new MsgPayloadTouchableMessageBuilder<T>(producer, producer.schema); + } + + private static class MsgPayloadTouchableMessageBuilder<T> extends TypedMessageBuilderImpl { + + public volatile ByteBuf payload; + + public <T> MsgPayloadTouchableMessageBuilder(ProducerBase producer, Schema<T> schema) { + super(producer, schema); + } + + @Override + public Message<T> getMessage() { + MessageImpl<T> msg = (MessageImpl<T>) super.getMessage(); + payload = msg.getPayload(); + // Retain the msg to avoid it be reused by other task. + payload.retain(); + return msg; + } + + public void release() { + payload.release(); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java index 44f1fb27465..7262cfd11e0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java @@ -290,8 +290,8 @@ class BatchMessageContainerImpl extends AbstractBatchMessageContainer { messages.forEach(msg -> producer.client.getMemoryLimitController() .releaseMemory(msg.getUncompressedSize())); producer.client.getMemoryLimitController().releaseMemory(batchAllocatedSizeBytes); - discard(new PulsarClientException.InvalidMessageException( - "Message size is bigger than " + getMaxMessageSize() + " bytes")); + discard(new PulsarClientException.InvalidMessageException("Message size " + + encryptedPayload.readableBytes() + " is bigger than " + getMaxMessageSize() + " bytes")); return null; } messageMetadata.setNumMessagesInBatch(numMessagesInBatch); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index b686252b58a..10e0ee2ee3d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -32,6 +32,9 @@ import static org.apache.pulsar.common.protocol.Commands.readChecksum; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandler; +import io.netty.channel.ChannelPromise; import io.netty.util.AbstractReferenceCounted; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; @@ -483,19 +486,46 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne return compressedPayload; } + /** + * Note on ByteBuf Release Behavior. + * + * <p>If you have a customized callback, please ignore the note below.</p> + * + * <p>When using the default callback, please confirm that the {@code refCnt()} value of the {@code message} + * (as returned by {@link MessageImpl#getDataBuffer}) is {@code 2} when you call this method. This is because + * the {@code ByteBuf} will be released twice under the following conditions:</p> + * + * <ul> + * <li><b>Batch Messaging Enabled:</b> + * <ol> + * <li>Release 1: When the message is pushed into the batched message queue (see {@link #doBatchSendAndAdd}). + * </li> + * <li>Release 2: In the method {@link SendCallback#sendComplete(Throwable, OpSendMsgStats)}.</li> + * </ol> + * </li> + * <li><b>Single Message (Batch Messaging Disabled):</b> + * <ol> + * <li>Release 1: When the message is written out by + * {@link ChannelOutboundHandler#write(ChannelHandlerContext, Object, ChannelPromise)}.</li> + * <li>Release 2: In the method {@link SendCallback#sendComplete(Throwable, OpSendMsgStats)}.</li> + * </ol> + * </li> + * </ul> + */ public void sendAsync(Message<?> message, SendCallback callback) { checkArgument(message instanceof MessageImpl); - - if (!isValidProducerState(callback, message.getSequenceId())) { - return; - } - MessageImpl<?> msg = (MessageImpl<?>) message; MessageMetadata msgMetadata = msg.getMessageBuilder(); ByteBuf payload = msg.getDataBuffer(); final int uncompressedSize = payload.readableBytes(); + if (!isValidProducerState(callback, message.getSequenceId())) { + payload.release(); + return; + } + if (!canEnqueueRequest(callback, message.getSequenceId(), uncompressedSize)) { + payload.release(); return; } @@ -573,6 +603,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne for (int i = 0; i < (totalChunks - 1); i++) { if (!conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) { + compressedPayload.release(); client.getMemoryLimitController().releaseMemory(uncompressedSize); semaphoreRelease(i + 1); return; @@ -603,6 +634,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } if (chunkId > 0 && conf.isBlockIfQueueFull() && !canEnqueueRequest(callback, message.getSequenceId(), 0 /* The memory was already reserved */)) { + compressedPayload.release(); client.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex); semaphoreRelease(totalChunks - chunkId); return; @@ -723,10 +755,13 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } else { // handle boundary cases where message being added would exceed // batch size and/or max message size - boolean isBatchFull = batchMessageContainer.add(msg, callback); - lastSendFuture = callback.getFuture(); - payload.release(); - triggerSendIfFullOrScheduleFlush(isBatchFull); + try { + boolean isBatchFull = batchMessageContainer.add(msg, callback); + lastSendFuture = callback.getFuture(); + triggerSendIfFullOrScheduleFlush(isBatchFull); + } finally { + payload.release(); + } } isLastSequenceIdPotentialDuplicated = false; } @@ -2304,6 +2339,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne batchMessageAndSend(false); } if (isMessageSizeExceeded(op)) { + op.cmd.release(); return; } pendingMessages.add(op); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java index 97f16c37b5d..38492ceae84 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java @@ -60,10 +60,10 @@ public class ProducerInterceptors implements Closeable { public Message beforeSend(Producer producer, Message message) { Message interceptorMessage = message; for (ProducerInterceptor interceptor : interceptors) { - if (!interceptor.eligible(message)) { - continue; - } try { + if (!interceptor.eligible(message)) { + continue; + } interceptorMessage = interceptor.beforeSend(producer, interceptorMessage); } catch (Throwable e) { if (producer != null) { @@ -93,10 +93,10 @@ public class ProducerInterceptors implements Closeable { */ public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId, Throwable exception) { for (ProducerInterceptor interceptor : interceptors) { - if (!interceptor.eligible(message)) { - continue; - } try { + if (!interceptor.eligible(message)) { + continue; + } interceptor.onSendAcknowledgement(producer, message, msgId, exception); } catch (Throwable e) { log.warn("Error executing interceptor onSendAcknowledgement callback ", e);
