This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 8078dd098c ARTEMIS-4171 Messages leaking thorugh AMQP Delivery
8078dd098c is described below
commit 8078dd098ccc38cd57eefa3e94ba1dfe50c6865b
Author: Clebert Suconic <[email protected]>
AuthorDate: Mon Feb 20 13:18:18 2023 -0500
ARTEMIS-4171 Messages leaking thorugh AMQP Delivery
there are two leaks here:
* QueueImpl::delivery might create a new iterator if a delivery happens
right after a consumer was removed, and that iterator might belog to a consumer
that was already closed
as a result of that, the iterator may leak messages and hold
references until a reboot is done. I have seen scenarios where messages would
not be dleivered because of this.
* ProtonTransaction holding references: the last transaction might hold
messages in the memory longer than expected. In tests I have performed the
messages were accumulating in memory. and I cleared it here.
---
.../artemis/utils/collections/LinkedListImpl.java | 13 +++-
.../amqp/proton/AMQPConnectionContext.java | 11 +--
.../amqp/proton/ProtonServerSenderContext.java | 4 +-
.../proton/transaction/ProtonTransactionImpl.java | 42 +++++++----
.../activemq/artemis/core/server/Consumer.java | 4 +
.../core/server/impl/QueueConsumersImpl.java | 2 +-
.../artemis/core/server/impl/QueueImpl.java | 21 ++++--
.../core/server/impl/ServerConsumerImpl.java | 16 +++-
.../artemis/tests/leak/ConnectionLeakTest.java | 86 ++++++++++++++++++++--
.../artemis/tests/leak/LinkedListMemoryTest.java | 71 ++++++++++++++++++
.../artemis/tests/leak/MemoryAssertions.java | 6 +-
11 files changed, 231 insertions(+), 45 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
index 5e36d333d2..121b5ba924 100644
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java
@@ -350,6 +350,9 @@ public class LinkedListImpl<E> implements LinkedList<E> {
@Override
public LinkedListIterator<E> iterator() {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Creating new iterator at", new Exception("trace
location"));
+ }
return new Iterator();
}
@@ -434,6 +437,9 @@ public class LinkedListImpl<E> implements LinkedList<E> {
}
private synchronized void removeIter(Iterator iter) {
+ if (logger.isTraceEnabled()) {
+ logger.trace("Removing iterator at", new Exception("trace location"));
+ }
for (int i = 0; i < numIters; i++) {
if (iter == iters[i]) {
iters[i] = null;
@@ -449,8 +455,10 @@ public class LinkedListImpl<E> implements LinkedList<E> {
if (numIters >= INITIAL_ITERATOR_ARRAY_SIZE && numIters ==
iters.length / 2) {
resize(numIters);
}
-
nextIndex--;
+ if (nextIndex < iters.length) {
+ iters[nextIndex] = null;
+ }
return;
}
@@ -515,8 +523,7 @@ public class LinkedListImpl<E> implements LinkedList<E> {
}
}
- private class Iterator implements LinkedListIterator<E> {
-
+ public class Iterator implements LinkedListIterator<E> {
Node<E> last;
Node<E> current = head.next;
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index a81f0d8c20..45ab664053 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -743,14 +743,9 @@ public class AMQPConnectionContext extends
ProtonInitializable implements EventH
}
}
- /// we have to perform the link.close after the linkContext.close is
finished.
- // linkeContext.close will perform a few executions on the netty loop,
- // this has to come next
- runLater(() -> {
- link.close();
- link.free();
- flush();
- });
+ link.close();
+ link.free();
+ flush();
}
@Override
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 1068e5c044..312b9bda8d 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -370,7 +370,9 @@ public class ProtonServerSenderContext extends
ProtonInitializable implements Pr
OperationContext oldContext = sessionSPI.recoverContext();
try {
- Message message = ((MessageReference)
delivery.getContext()).getMessage();
+ MessageReference reference = (MessageReference) delivery.getContext();
+ Message message = reference != null ? reference.getMessage() : null;
+
DeliveryState remoteState = delivery.getRemoteState();
if (remoteState != null && remoteState.getType() ==
DeliveryStateType.Accepted) {
diff --git
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
index 123dbb5d9b..0e3c4e63ea 100644
---
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
+++
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/transaction/ProtonTransactionImpl.java
@@ -45,26 +45,38 @@ public class ProtonTransactionImpl extends TransactionImpl {
deliveries have been settled. We also need to ensure we are settling on
the correct link. Hence why we keep a ref
to the ProtonServerSenderContext here.
*/
- private final Map<MessageReference, Pair<Delivery,
ProtonServerSenderContext>> deliveries = new HashMap<>();
+ final Map<MessageReference, Pair<Delivery, ProtonServerSenderContext>>
deliveries = new HashMap<>();
private boolean discharged;
+ private static class TXOperations extends TransactionOperationAbstract {
+ final ProtonTransactionImpl protonTransaction;
+ final AMQPConnectionContext connection;
+
+ TXOperations(AMQPConnectionContext connection, ProtonTransactionImpl tx)
{
+ this.protonTransaction = tx;
+ this.connection = connection;
+ }
+
+ @Override
+ public void afterCommit(Transaction tx) {
+ super.afterCommit(tx);
+ connection.runNow(() -> {
+ // Settle all unsettled deliveries if commit is successful
+ for (Pair<Delivery, ProtonServerSenderContext> p :
protonTransaction.deliveries.values()) {
+ if (!p.getA().isSettled())
+ p.getB().settle(p.getA());
+ }
+ connection.flush();
+ protonTransaction.deliveries.forEach((a, b) ->
b.getA().setContext(null));
+ protonTransaction.deliveries.clear();
+ });
+ }
+ }
+
public ProtonTransactionImpl(final Xid xid, final StorageManager
storageManager, final int timeoutSeconds, final AMQPConnectionContext
connection) {
super(xid, storageManager, timeoutSeconds);
- addOperation(new TransactionOperationAbstract() {
- @Override
- public void afterCommit(Transaction tx) {
- super.afterCommit(tx);
- connection.runNow(() -> {
- // Settle all unsettled deliveries if commit is successful
- for (Pair<Delivery, ProtonServerSenderContext> p :
deliveries.values()) {
- if (!p.getA().isSettled())
- p.getB().settle(p.getA());
- }
- connection.flush();
- });
- }
- });
+ addOperation(new TXOperations(connection, this));
}
@Override
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index f5178da111..7db22175da 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -53,6 +53,10 @@ public interface Consumer extends PriorityAware {
default void promptDelivery() {
}
+ default boolean isClosed() {
+ return false;
+ }
+
/**
* This will proceed with the actual delivery.
* Notice that handle should hold a readLock and proceedDelivery should
release the readLock
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
index e32bbccac6..7f16ceea3a 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java
@@ -93,9 +93,9 @@ public class QueueConsumersImpl<T extends PriorityAware>
implements QueueConsume
@Override
public boolean remove(T t) {
- iterator.removed(t);
boolean result = consumers.remove(t);
if (result) {
+ iterator.removed(t);
iterator.update(consumers.resettableIterator());
if (consumers.isEmpty()) {
reset();
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 2aeb364588..738620b173 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -380,10 +380,11 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
boolean foundRef = false;
synchronized (this) {
- Iterator<MessageReference> iter = messageReferences.iterator();
- while (iter.hasNext()) {
- foundRef = true;
- out.println("reference = " + iter.next());
+ try (LinkedListIterator<MessageReference> iter =
messageReferences.iterator()) {
+ while (iter.hasNext()) {
+ foundRef = true;
+ out.println("reference = " + iter.next());
+ }
}
}
@@ -1483,7 +1484,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
logger.debug("Removing consumer {}", consumer);
try (ArtemisCloseable metric = measureCritical(CRITICAL_CONSUMER)) {
- synchronized (this) {
+ synchronized (QueueImpl.this) {
boolean consumerRemoved = false;
for (ConsumerHolder holder : consumers) {
@@ -3060,7 +3061,7 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
MessageReference ref;
Consumer handledconsumer = null;
- synchronized (this) {
+ synchronized (QueueImpl.this) {
if (queueDestroyed) {
if (messageReferences.size() == 0) {
@@ -3094,6 +3095,14 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
Consumer consumer = holder.consumer;
Consumer groupConsumer = null;
+ // we remove the consumerHolder when the Consumer is closed
+ // however the QueueConsumerIterator may hold a reference until
the reset is called, which
+ // could happen a little later.
+ if (consumer.isClosed()) {
+ deliverAsync(true);
+ return false;
+ }
+
if (holder.iter == null) {
holder.iter = messageReferences.iterator();
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index a13d362af1..d4c7e32272 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -163,6 +163,11 @@ public class ServerConsumerImpl implements ServerConsumer,
ReadyListener {
private boolean isClosed = false;
+ @Override
+ public boolean isClosed() {
+ return isClosed;
+ }
+
ServerConsumerMetrics metrics = new ServerConsumerMetrics();
@@ -618,11 +623,14 @@ public class ServerConsumerImpl implements
ServerConsumer, ReadyListener {
server.callBrokerConsumerPlugins(plugin ->
plugin.afterCloseConsumer(this, failed));
}
- protocolContext = null;
+ messageQueue.getExecutor().execute(() -> {
+ protocolContext = null;
- callback = null;
+ callback = null;
+
+ session = null;
+ });
- session = null;
}
private void addLingerRefs() throws Exception {
@@ -1116,7 +1124,7 @@ public class ServerConsumerImpl implements
ServerConsumer, ReadyListener {
*/
@Override
public String toString() {
- return "ServerConsumerImpl [id=" + id + ", filter=" + filter + ",
binding=" + binding + "]";
+ return "ServerConsumerImpl [id=" + id + ", filter=" + filter + ",
binding=" + binding + ", closed=" + isClosed + "]";
}
@Override
diff --git
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java
index 9cbcda36f6..1af125e141 100644
---
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java
+++
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ConnectionLeakTest.java
@@ -24,19 +24,23 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
-
import java.lang.invoke.MethodHandles;
+import io.github.checkleak.core.CheckLeak;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
import
org.apache.activemq.artemis.core.protocol.core.impl.RemotingConnectionImpl;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.ServerConsumerImpl;
import org.apache.activemq.artemis.core.server.impl.ServerStatus;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
-import io.github.checkleak.core.CheckLeak;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
+import org.apache.qpid.proton.engine.impl.DeliveryImpl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
@@ -81,7 +85,7 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
@Override
@Before
public void setUp() throws Exception {
- server = createServer(true, createDefaultConfig(1, true));
+ server = createServer(false, createDefaultConfig(1, true));
server.getConfiguration().setJournalPoolFiles(4).setJournalMinFiles(2);
server.start();
}
@@ -102,6 +106,9 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
}
private void doTest(String protocol) throws Exception {
+ CheckLeak checkLeak = new CheckLeak();
+ // Some protocols may create ServerConsumers
+ int originalConsumers =
checkLeak.getAllObjects(ServerConsumerImpl.class).length;
int REPEATS = 100;
int MESSAGES = 20;
basicMemoryAsserts();
@@ -143,12 +150,17 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
targetProducer.send(m);
}
Assert.assertNull(sourceConsumer.receiveNoWait());
+ consumerSession.commit();
+
+ Wait.assertTrue(() -> validateClosedConsumers(checkLeak));
}
- consumerSession.commit();
}
}
}
+ assertMemory(new CheckLeak(), 0, ServerConsumerImpl.class.getName());
+
+
// this is just to drain the messages
try (Connection targetConnection = cf.createConnection(); Connection
consumerConnection = cf.createConnection()) {
Session targetSession = targetConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
@@ -160,6 +172,9 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
}
Assert.assertNull(consumer.receiveNoWait());
+ assertMemory(new CheckLeak(), 0, DeliveryImpl.class.getName());
+ Wait.assertTrue(() -> validateClosedConsumers(checkLeak));
+ consumer = null;
}
Queue sourceQueue = server.locateQueue("source");
@@ -173,6 +188,65 @@ public class ConnectionLeakTest extends ActiveMQTestBase {
}
basicMemoryAsserts();
+ }
+
+ @Test
+ public void testCheckIteratorsAMQP() throws Exception {
+ testCheckIterators("AMQP");
+ }
+
+ @Test
+ public void testCheckIteratorsOpenWire() throws Exception {
+ testCheckIterators("OPENWIRE");
+ }
+ @Test
+ public void testCheckIteratorsCORE() throws Exception {
+ testCheckIterators("CORE");
+ }
+
+ public void testCheckIterators(String protocol) throws Exception {
+ CheckLeak checkLeak = new CheckLeak();
+
+ String queueName = getName();
+
+ Queue queue = server.createQueue(new
QueueConfiguration(queueName).setRoutingType(RoutingType.ANYCAST));
+
+ ConnectionFactory cf = CFUtil.createConnectionFactory(protocol,
"tcp://localhost:61616");
+ for (int i = 0; i < 10; i++) {
+ Connection connection = cf.createConnection();
+ connection.start();
+ for (int j = 0; j < 10; j++) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageProducer producer =
session.createProducer(session.createQueue(queueName));
+ producer.send(session.createTextMessage("test"));
+ session.commit();
+ session.close();
+ }
+
+ for (int j = 0; j < 10; j++) {
+ Session session = connection.createSession(true,
Session.SESSION_TRANSACTED);
+ MessageConsumer consumer =
session.createConsumer(session.createQueue(queueName));
+ consumer.receiveNoWait(); // it doesn't matter if it received or
not, just doing something in the queue to kick the iterators
+ session.commit();
+ }
+ connection.close();
+
+ assertMemory(checkLeak, 0, 1, 1, ServerConsumerImpl.class.getName());
+ assertMemory(checkLeak, 0, 2, 1,
LinkedListImpl.Iterator.class.getName());
+ }
+ }
+
+
+ private boolean validateClosedConsumers(CheckLeak checkLeak) throws
Exception {
+ Object[] objecs = checkLeak.getAllObjects(ServerConsumerImpl.class);
+ for (Object obj : objecs) {
+ ServerConsumerImpl consumer = (ServerConsumerImpl) obj;
+ if (consumer.isClosed()) {
+ logger.info("References to closedConsumer {}\n{}", consumer,
checkLeak.exploreObjectReferences(3, 1, true, consumer));
+ return false;
+ }
+ }
+ return true;
}
}
\ No newline at end of file
diff --git
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/LinkedListMemoryTest.java
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/LinkedListMemoryTest.java
new file mode 100644
index 0000000000..2663d94d0c
--- /dev/null
+++
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/LinkedListMemoryTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.leak;
+
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Random;
+
+import io.github.checkleak.core.CheckLeak;
+import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
+import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LinkedListMemoryTest {
+
+ private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ Random random = new Random();
+
+ CheckLeak checkLeak = new CheckLeak();
+
+ public int randomInt(int x, int y) {
+
+ int randomNumber = random.nextInt(y - x + 1) + x;
+
+ return randomNumber;
+ }
+
+ @Test
+ public void testRemoveIteratorsRandom() throws Exception {
+ LinkedListImpl<String> linkedList = new LinkedListImpl<>((a, b) ->
a.compareTo(b));
+
+ linkedList.addSorted("Test");
+
+ int iterators = 100;
+ ArrayList<LinkedListIterator> listIerators = new ArrayList();
+
+ for (int i = 0; i < iterators; i++) {
+ listIerators.add(linkedList.iterator());
+ }
+
+ int countRemoved = 0;
+
+ while (listIerators.size() > 0) {
+ int removeElement = randomInt(0, listIerators.size() - 1);
+ countRemoved++;
+ LinkedListIterator toRemove = listIerators.remove(removeElement);
+ toRemove.close();
+ toRemove = null;
+ MemoryAssertions.assertMemory(checkLeak, iterators - countRemoved,
LinkedListImpl.Iterator.class.getName());
+ }
+ MemoryAssertions.assertMemory(checkLeak, 0,
LinkedListImpl.Iterator.class.getName());
+ }
+}
diff --git
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
index 3e5eac93ea..47e4cab2b9 100644
---
a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
+++
b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java
@@ -49,6 +49,10 @@ public class MemoryAssertions {
}
public static void assertMemory(CheckLeak checkLeak, int maxExpected,
String clazz) throws Exception {
+ assertMemory(checkLeak, maxExpected, 10, 10, clazz);
+ }
+
+ public static void assertMemory(CheckLeak checkLeak, int maxExpected, int
maxLevel, int maxObjects, String clazz) throws Exception {
Wait.waitFor(() -> checkLeak.getAllObjects(clazz).length <= maxExpected,
5000, 100);
Object[] objects = checkLeak.getAllObjects(clazz);
@@ -56,7 +60,7 @@ public class MemoryAssertions {
for (Object obj : objects) {
logger.warn("Object {} still in the heap", obj);
}
- String report = checkLeak.exploreObjectReferences(10, 10, true,
objects);
+ String report = checkLeak.exploreObjectReferences(maxLevel,
maxObjects, true, objects);
logger.info(report);
Assert.fail("Class " + clazz + " has leaked " + objects.length + "
objects\n" + report);