This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3e6133ee014 [fix][broker] fix flaky test in
SystemTopicBasedTopicPoliciesServiceTest (#25098)
3e6133ee014 is described below
commit 3e6133ee014b0195476e9e25a71194ed86f4bd18
Author: ken <[email protected]>
AuthorDate: Fri Jan 2 22:39:38 2026 +0800
[fix][broker] fix flaky test in SystemTopicBasedTopicPoliciesServiceTest
(#25098)
Co-authored-by: fanjianye <[email protected]>
Co-authored-by: Lari Hotari <[email protected]>
---
.../SystemTopicBasedTopicPoliciesServiceTest.java | 70 +++++++---------------
.../org/apache/pulsar/utils/TestLogAppender.java | 21 +++++++
2 files changed, 43 insertions(+), 48 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
index 2f503e5512a..6b050e8b421 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java
@@ -28,7 +28,6 @@ import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertNull;
import static org.testng.AssertJUnit.assertTrue;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -43,10 +42,6 @@ import java.util.concurrent.Executors;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.FieldUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.core.LogEvent;
-import org.apache.logging.log4j.core.Logger;
-import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -57,6 +52,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
+import org.apache.pulsar.utils.TestLogAppender;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
@@ -479,19 +475,12 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
@Test
public void
testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws
Exception {
- // catch the log output in SystemTopicBasedTopicPoliciesService
- Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
- List<String> logMessages = new ArrayList<>();
- AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
- @Override
- public void append(LogEvent event) {
- logMessages.add(event.getMessage().getFormattedMessage());
- }
- };
- appender.start();
- logger.addAppender(appender);
+ // catch the log output
+ @Cleanup
+ TestLogAppender testLogAppender = TestLogAppender.create(log);
// create namespace-5 and topic
+ pulsar.getTopicPoliciesService().close();
SystemTopicBasedTopicPoliciesService spyService =
Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
@@ -527,8 +516,8 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
reader.close();
log.info("successfully close spy reader");
Awaitility.await().untilAsserted(() -> {
- boolean logFound = logMessages.stream()
- .anyMatch(msg -> msg.contains("Closing the topic policies
reader for"));
+ boolean logFound =
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+ logEvent.getMessage().toString().contains("Closing the
topic policies reader for"));
assertTrue(logFound);
});
@@ -565,39 +554,28 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
// make sure not do cleanPoliciesCacheInitMap() twice
// totally trigger prepareInitPoliciesCacheAsync() twice, so the time
of cleanPoliciesCacheInitMap() is 2.
// in previous code, the time would be 3
- boolean logFound = logMessages.stream()
- .anyMatch(msg -> msg.contains("Failed to create reader on
__change_events topic"));
+ boolean logFound =
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+ logEvent.getMessage().toString().contains("Failed to create
reader on __change_events topic"));
assertFalse(logFound);
- boolean logFound2 = logMessages.stream()
- .anyMatch(msg -> msg.contains("Failed to check the move events
for the system topic"));
+ boolean logFound2 =
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+ logEvent.getMessage().toString().contains("Failed to check the
move events for the system topic"));
assertTrue(logFound2);
verify(spyService, times(2)).cleanPoliciesCacheInitMap(any(),
anyBoolean());
// make sure not occur Recursive update
- boolean logFound3 = logMessages.stream()
- .anyMatch(msg -> msg.contains("Recursive update"));
+ boolean logFound3 =
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+ logEvent.getMessage().toString().contains("Recursive update"));
assertFalse(logFound3);
-
- // clean log appender
- appender.stop();
- logger.removeAppender(appender);
}
@Test
public void
testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() throws
Exception {
- // catch the log output in SystemTopicBasedTopicPoliciesService
- Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
- List<String> logMessages = new ArrayList<>();
- AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
- @Override
- public void append(LogEvent event) {
- logMessages.add(event.getMessage().getFormattedMessage());
- }
- };
- appender.start();
- logger.addAppender(appender);
+ // catch the log output
+ @Cleanup
+ TestLogAppender testLogAppender = TestLogAppender.create(log);
// create namespace-5 and topic
+ pulsar.getTopicPoliciesService().close();
SystemTopicBasedTopicPoliciesService spyService =
Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
@@ -644,17 +622,13 @@ public class SystemTopicBasedTopicPoliciesServiceTest
extends MockedPulsarServic
// make sure not do cleanPoliciesCacheInitMap() twice
// totally trigger prepareInitPoliciesCacheAsync() once, so the time
of cleanPoliciesCacheInitMap() is 1.
- boolean logFound = logMessages.stream()
- .anyMatch(msg -> msg.contains("Failed to create reader on
__change_events topic"));
+ boolean logFound =
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+ logEvent.getMessage().toString().contains("Failed to create
reader on __change_events topic"));
assertTrue(logFound);
- boolean logFound2 = logMessages.stream()
- .anyMatch(msg -> msg.contains("Failed to check the move events
for the system topic")
- || msg.contains("Failed to read event from the system
topic"));
+ boolean logFound2 =
testLogAppender.getEvents().stream().anyMatch(logEvent ->
+ logEvent.getMessage().toString().contains("Failed to check the
move events for the system topic")
+ || logEvent.getMessage().toString().contains("Failed
to read event from the system topic"));
assertFalse(logFound2);
verify(spyService, times(1)).cleanPoliciesCacheInitMap(any(),
anyBoolean());
-
- // clean log appender
- appender.stop();
- logger.removeAppender(appender);
}
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java
b/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java
index cfb07913b53..a8fecaeb655 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/TestLogAppender.java
@@ -32,6 +32,7 @@ import
org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.logging.log4j.core.layout.PatternLayout;
+import org.slf4j.Logger;
/**
* Log4J appender that captures all log events for a specified logger.
@@ -57,6 +58,25 @@ public class TestLogAppender extends AbstractAppender
implements AutoCloseable {
context.updateLoggers();
return testAppender;
}
+ /**
+ * Create a new TestLogAppender for a given logger. Use the {@link
#close()} method to stop it and unregister it
+ * from Log4J.
+ * @param log The name of the logger instance will be used as the logger
name to register the appender to.
+ * @return return the new TestLogAppender instance.
+ */
+ public static TestLogAppender create(Logger log) {
+ return create(Optional.of(log.getName()));
+ }
+
+ /**
+ * Create a new TestLogAppender for a given class. Use the {@link
#close()} method to stop it and unregister it
+ * from Log4J.
+ * @param clazz The name of the class will be used as the logger name to
register the appender to.
+ * @return return the new TestLogAppender instance.
+ */
+ public static TestLogAppender create(Class<?> clazz) {
+ return create(Optional.of(clazz.getName()));
+ }
TestLogAppender(LoggerConfig loggerConfig, Runnable onConfigurationChange)
{
super("TestAppender" + idGenerator.incrementAndGet(), null,
PatternLayout.createDefaultLayout(), false, null);
@@ -64,6 +84,7 @@ public class TestLogAppender extends AbstractAppender
implements AutoCloseable {
this.onConfigurationChange = onConfigurationChange;
}
+
@Override
public void append(LogEvent event) {
events.add(event.toImmutable());