Copilot commented on code in PR #24978:
URL: https://github.com/apache/pulsar/pull/24978#discussion_r2527531597
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java:
##########
@@ -532,4 +542,186 @@ public void
testCreateNamespaceEventsSystemTopicFactoryException() throws Except
Assert.assertNotNull(topicPolicies);
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
}
+
+ @Test
+ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws
Exception {
+ // catch the log output in SystemTopicBasedTopicPoliciesService
+ Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+ List<String> logMessages = new ArrayList<>();
+ AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
+ @Override
+ public void append(LogEvent event) {
+ logMessages.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ appender.start();
+ logger.get().addAppender(appender, null, null);
+ logger.addAppender(appender);
+
+ // create namespace-5 and topic
+ SystemTopicBasedTopicPoliciesService spyService = Mockito.spy(new
SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+ final String topic = "persistent://" + NAMESPACE5 + "/test" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+
+ CompletableFuture<Void> future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+
+ // mock readerCache and new a reader, then put this reader in
readerCache.
+ // when new reader, would trigger __change_event topic of namespace-5
created
+ // and would trigger prepareInitPoliciesCacheAsync()
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
+
spyService.createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ // set topic policy. create producer for __change_event topic
+ admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNotNull(future);
+
+ // trigger close reader of __change_event directly, simulate that
reader
+ // is closed for some reason, such as topic unload or broker restart.
+ // since prepareInitPoliciesCacheAsync() has been executed, it would
go into readMorePoliciesAsync(),
+ // throw exception, output "Closing the topic policies reader for" and
do cleanCacheAndCloseReader()
+ SystemTopicClient.Reader<PulsarEvent> reader =
readerCompletableFuture.get();
+ reader.close();
+ log.info("successfully close spy reader");
+ Awaitility.await().untilAsserted(() -> {
+ boolean logFound = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Closing the topic policies
reader for"));
+ assertTrue(logFound);
+ });
+
+ // Since cleanCacheAndCloseReader() is executed, should add the failed
reader into readerCache again.
+ // Then in SystemTopicBasedTopicPoliciesService, readerCache has a
closed reader,
+ // and policyCacheInitMap do not contain a future.
+ // To simulate the situation: when getTopicPolicy() execute, it will
do prepareInitPoliciesCacheAsync() and
+ // use a closed reader to read the __change_event topic. Then throw
exception
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ CompletableFuture<Void> prepareFuture = new CompletableFuture<>();
+ try {
+ prepareFuture =
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+ prepareFuture.get();
+ Assert.fail();
+ } catch (Exception e) {
+ // that is ok
+ }
+
+
+ // since prepareInitPoliciesCacheAsync() throw exception when
initPolicesCache(),
+ // would clean readerCache and policyCacheInitMap
+ // sleep 500ms to make sure clean operation finish.
+ Thread.sleep(500);
+ Assert.assertTrue(prepareFuture.isCompletedExceptionally());
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture1 =
+ spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(readerCompletableFuture1);
+
+
+ // make sure not do cleanCacheAndCloseReader() twice
+ // totally trigger prepareInitPoliciesCacheAsync() twice, so the time
of cleanCacheAndCloseReader() is 2.
+ // in previous code, the time would be 3
+ boolean logFound = logMessages.stream()
+ .anyMatch(msg -> msg.contains("occur exception on reader of
__change_events topic"));
+ assertTrue(logFound);
+ boolean logFound2 = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Failed to check the move events
for the system topic"));
+ assertTrue(logFound2);
+ verify(spyService, times(2)).cleanCacheAndCloseReader(any(),
anyBoolean(), anyBoolean());
+
+ // make sure not occur Recursive update
+ boolean logFound3 = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Recursive update"));
+ assertFalse(logFound3);
+
+ // clean log appender
+ appender.stop();
+ logger.removeAppender(appender);
+ }
+
+ @Test
+ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() throws
Exception {
+ // catch the log output in SystemTopicBasedTopicPoliciesService
+ Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+ List<String> logMessages = new ArrayList<>();
+ AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
+ @Override
+ public void append(LogEvent event) {
+ logMessages.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ appender.start();
+ logger.get().addAppender(appender, null, null);
+ logger.addAppender(appender);
+
+ // create namespace-5 and topic
+ SystemTopicBasedTopicPoliciesService spyService =
+ Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+ final String topic = "persistent://" + NAMESPACE5 + "/test" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+
+ CompletableFuture<Void> future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+
+ // mock readerCache and put a failed readerCreateFuture in readerCache.
+ // simulate that when trigger prepareInitPoliciesCacheAsync(),
+ // it would use this failed readerFuture and go into corresponding
logic
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture = new CompletableFuture<>();
+ readerCompletableFuture.completeExceptionally(new Exception("create
reader fail"));
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ // trigger prepareInitPoliciesCacheAsync()
+ CompletableFuture<Void> prepareFuture = new CompletableFuture<>();
+ try {
+ prepareFuture =
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+ prepareFuture.get();
+ Assert.fail();
+ } catch (Exception e) {
+ // that is ok
+ }
+
+ // since prepareInitPoliciesCacheAsync() throw exception when
createReader,
+ // would clean readerCache and policyCacheInitMap.
+ // sleep 500ms to make sure clean operation finish.
+ Thread.sleep(500);
+ Assert.assertTrue(prepareFuture.isCompletedExceptionally());
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture1 =
+ spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(readerCompletableFuture1);
Review Comment:
Using `Thread.sleep(500)` in tests is brittle and can lead to flaky test
behavior. The test already uses Awaitility in other places. Consider using
`Awaitility.await()` with appropriate conditions to verify that the cleanup
operations have completed instead of relying on a fixed sleep duration.
```suggestion
// Await cleanup operation to finish.
Awaitility.await()
.atMost(Duration.ofSeconds(2))
.untilAsserted(() -> {
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
Assert.assertNull(spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)));
Assert.assertNull(spyReaderCaches.get(NamespaceName.get(NAMESPACE5)));
});
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java:
##########
@@ -532,4 +542,186 @@ public void
testCreateNamespaceEventsSystemTopicFactoryException() throws Except
Assert.assertNotNull(topicPolicies);
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
}
+
+ @Test
+ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws
Exception {
+ // catch the log output in SystemTopicBasedTopicPoliciesService
+ Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+ List<String> logMessages = new ArrayList<>();
+ AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
+ @Override
+ public void append(LogEvent event) {
+ logMessages.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ appender.start();
+ logger.get().addAppender(appender, null, null);
+ logger.addAppender(appender);
+
+ // create namespace-5 and topic
+ SystemTopicBasedTopicPoliciesService spyService = Mockito.spy(new
SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+ final String topic = "persistent://" + NAMESPACE5 + "/test" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+
+ CompletableFuture<Void> future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+
+ // mock readerCache and new a reader, then put this reader in
readerCache.
+ // when new reader, would trigger __change_event topic of namespace-5
created
+ // and would trigger prepareInitPoliciesCacheAsync()
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
+
spyService.createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ // set topic policy. create producer for __change_event topic
+ admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNotNull(future);
+
+ // trigger close reader of __change_event directly, simulate that
reader
+ // is closed for some reason, such as topic unload or broker restart.
+ // since prepareInitPoliciesCacheAsync() has been executed, it would
go into readMorePoliciesAsync(),
+ // throw exception, output "Closing the topic policies reader for" and
do cleanCacheAndCloseReader()
+ SystemTopicClient.Reader<PulsarEvent> reader =
readerCompletableFuture.get();
+ reader.close();
+ log.info("successfully close spy reader");
+ Awaitility.await().untilAsserted(() -> {
+ boolean logFound = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Closing the topic policies
reader for"));
+ assertTrue(logFound);
+ });
+
+ // Since cleanCacheAndCloseReader() is executed, should add the failed
reader into readerCache again.
+ // Then in SystemTopicBasedTopicPoliciesService, readerCache has a
closed reader,
+ // and policyCacheInitMap do not contain a future.
+ // To simulate the situation: when getTopicPolicy() execute, it will
do prepareInitPoliciesCacheAsync() and
+ // use a closed reader to read the __change_event topic. Then throw
exception
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ CompletableFuture<Void> prepareFuture = new CompletableFuture<>();
+ try {
+ prepareFuture =
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+ prepareFuture.get();
+ Assert.fail();
+ } catch (Exception e) {
+ // that is ok
+ }
+
+
+ // since prepareInitPoliciesCacheAsync() throw exception when
initPolicesCache(),
+ // would clean readerCache and policyCacheInitMap
+ // sleep 500ms to make sure clean operation finish.
+ Thread.sleep(500);
+ Assert.assertTrue(prepareFuture.isCompletedExceptionally());
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture1 =
+ spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(readerCompletableFuture1);
+
+
+ // make sure not do cleanCacheAndCloseReader() twice
+ // totally trigger prepareInitPoliciesCacheAsync() twice, so the time
of cleanCacheAndCloseReader() is 2.
+ // in previous code, the time would be 3
+ boolean logFound = logMessages.stream()
+ .anyMatch(msg -> msg.contains("occur exception on reader of
__change_events topic"));
+ assertTrue(logFound);
+ boolean logFound2 = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Failed to check the move events
for the system topic"));
+ assertTrue(logFound2);
+ verify(spyService, times(2)).cleanCacheAndCloseReader(any(),
anyBoolean(), anyBoolean());
+
+ // make sure not occur Recursive update
+ boolean logFound3 = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Recursive update"));
+ assertFalse(logFound3);
+
+ // clean log appender
+ appender.stop();
+ logger.removeAppender(appender);
+ }
+
+ @Test
+ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() throws
Exception {
+ // catch the log output in SystemTopicBasedTopicPoliciesService
+ Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+ List<String> logMessages = new ArrayList<>();
+ AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
+ @Override
+ public void append(LogEvent event) {
+ logMessages.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ appender.start();
+ logger.get().addAppender(appender, null, null);
Review Comment:
The call to `logger.get().addAppender(appender, null, null)` appears
redundant with the following line `logger.addAppender(appender)`. Based on the
Log4j2 API, `Logger.get()` returns the underlying LoggerConfig, and the second
call to `logger.addAppender(appender)` should be sufficient. The first call
with three parameters (the third being null) is typically used on LoggerConfig
objects. Consider removing the first call to simplify the code and avoid
potential confusion.
```suggestion
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java:
##########
@@ -532,4 +542,186 @@ public void
testCreateNamespaceEventsSystemTopicFactoryException() throws Except
Assert.assertNotNull(topicPolicies);
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
}
+
+ @Test
+ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws
Exception {
+ // catch the log output in SystemTopicBasedTopicPoliciesService
+ Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+ List<String> logMessages = new ArrayList<>();
+ AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
+ @Override
+ public void append(LogEvent event) {
+ logMessages.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ appender.start();
+ logger.get().addAppender(appender, null, null);
+ logger.addAppender(appender);
+
+ // create namespace-5 and topic
+ SystemTopicBasedTopicPoliciesService spyService = Mockito.spy(new
SystemTopicBasedTopicPoliciesService(pulsar));
+ FieldUtils.writeField(pulsar, "topicPoliciesService", spyService,
true);
+
+
+ admin.namespaces().createNamespace(NAMESPACE5);
+ final String topic = "persistent://" + NAMESPACE5 + "/test" +
UUID.randomUUID();
+ admin.topics().createPartitionedTopic(topic, 1);
+
+ CompletableFuture<Void> future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNull(future);
+
+ // mock readerCache and new a reader, then put this reader in
readerCache.
+ // when new reader, would trigger __change_event topic of namespace-5
created
+ // and would trigger prepareInitPoliciesCacheAsync()
+ ConcurrentHashMap<NamespaceName,
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
+ spyReaderCaches = new ConcurrentHashMap<>();
+ CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>
readerCompletableFuture =
+
spyService.createSystemTopicClient(NamespaceName.get(NAMESPACE5));
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ // set topic policy. create producer for __change_event topic
+ admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
+ future =
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
+ Assert.assertNotNull(future);
+
+ // trigger close reader of __change_event directly, simulate that
reader
+ // is closed for some reason, such as topic unload or broker restart.
+ // since prepareInitPoliciesCacheAsync() has been executed, it would
go into readMorePoliciesAsync(),
+ // throw exception, output "Closing the topic policies reader for" and
do cleanCacheAndCloseReader()
+ SystemTopicClient.Reader<PulsarEvent> reader =
readerCompletableFuture.get();
+ reader.close();
+ log.info("successfully close spy reader");
+ Awaitility.await().untilAsserted(() -> {
+ boolean logFound = logMessages.stream()
+ .anyMatch(msg -> msg.contains("Closing the topic policies
reader for"));
+ assertTrue(logFound);
+ });
+
+ // Since cleanCacheAndCloseReader() is executed, should add the failed
reader into readerCache again.
+ // Then in SystemTopicBasedTopicPoliciesService, readerCache has a
closed reader,
+ // and policyCacheInitMap do not contain a future.
+ // To simulate the situation: when getTopicPolicy() execute, it will
do prepareInitPoliciesCacheAsync() and
+ // use a closed reader to read the __change_event topic. Then throw
exception
+ spyReaderCaches.put(NamespaceName.get(NAMESPACE5),
readerCompletableFuture);
+ FieldUtils.writeDeclaredField(spyService, "readerCaches",
spyReaderCaches, true);
+
+ CompletableFuture<Void> prepareFuture = new CompletableFuture<>();
+ try {
+ prepareFuture =
spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
+ prepareFuture.get();
+ Assert.fail();
+ } catch (Exception e) {
+ // that is ok
+ }
+
+
+ // since prepareInitPoliciesCacheAsync() throw exception when
initPolicesCache(),
+ // would clean readerCache and policyCacheInitMap
+ // sleep 500ms to make sure clean operation finish.
+ Thread.sleep(500);
+ Assert.assertTrue(prepareFuture.isCompletedExceptionally());
Review Comment:
Using `Thread.sleep(500)` in tests is brittle and can lead to flaky test
behavior. The test already uses Awaitility in other places (line 595). Consider
using `Awaitility.await()` with appropriate conditions to verify that the
cleanup operations have completed instead of relying on a fixed sleep duration.
```suggestion
// Wait until cleanup operation finishes.
Awaitility.await()
.atMost(Duration.ofSeconds(2))
.until(() -> prepareFuture.isCompletedExceptionally()
&&
spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)) == null
&&
spyReaderCaches.get(NamespaceName.get(NAMESPACE5)) == null);
```
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java:
##########
@@ -532,4 +542,186 @@ public void
testCreateNamespaceEventsSystemTopicFactoryException() throws Except
Assert.assertNotNull(topicPolicies);
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
}
+
+ @Test
+ public void
testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws
Exception {
+ // catch the log output in SystemTopicBasedTopicPoliciesService
+ Logger logger = (Logger)
LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
+ List<String> logMessages = new ArrayList<>();
+ AbstractAppender appender = new AbstractAppender("TestAppender", null,
null) {
+ @Override
+ public void append(LogEvent event) {
+ logMessages.add(event.getMessage().getFormattedMessage());
+ }
+ };
+ appender.start();
+ logger.get().addAppender(appender, null, null);
Review Comment:
The call to `logger.get().addAppender(appender, null, null)` appears
redundant with the following line `logger.addAppender(appender)`. Based on the
Log4j2 API, `Logger.get()` returns the underlying LoggerConfig, and the second
call to `logger.addAppender(appender)` should be sufficient. The first call
with three parameters (the third being null) is typically used on LoggerConfig
objects. Consider removing the first call to simplify the code and avoid
potential confusion.
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]