vinkal-chudgar commented on code in PR #24929:
URL: https://github.com/apache/pulsar/pull/24929#discussion_r2487520898
##########
pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java:
##########
@@ -873,36 +879,57 @@ public void testMsgDropStat() throws Exception {
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
+
+ final int threads = 10;
@Cleanup("shutdownNow")
- ExecutorService executor = Executors.newFixedThreadPool(10);
+ ExecutorService executor = Executors.newFixedThreadPool(threads);
byte[] msgData = "testData".getBytes();
- final int totalProduceMessages = 1000;
- CountDownLatch latch = new CountDownLatch(1);
- AtomicInteger messagesSent = new AtomicInteger(0);
- for (int i = 0; i < totalProduceMessages; i++) {
- executor.submit(() -> {
- try {
- MessageId msgId = producer.send(msgData);
- int count = messagesSent.incrementAndGet();
- // process at least 20% of messages before signalling
the latch
- // a non-persistent message will return entryId as -1
when it has been dropped
- // due to
setMaxConcurrentNonPersistentMessagePerConnection limit
- // also ensure that it has happened before the latch
is signalled
- if (count > totalProduceMessages * 0.2 && msgId != null
- && ((MessageIdImpl) msgId).getEntryId() == -1)
{
- latch.countDown();
+
+ /*
+ * Trigger at least one publisher drop through concurrent send()
calls.
+ *
+ * Uses CyclicBarrier to ensure all threads send simultaneously,
creating overlap.
+ * With maxConcurrentNonPersistentMessagePerConnection = 0,
ServerCnx#handleSend
+ * drops any send while another is in-flight, returning MessageId
with entryId = -1.
+ * Awaitility repeats whole bursts (bounded to 20s) until a drop
is observed.
+ */
+ AtomicBoolean publisherDropSeen = new AtomicBoolean(false);
+ Awaitility.await().atMost(Duration.ofSeconds(20)).until(() -> {
+ CyclicBarrier barrier = new CyclicBarrier(threads);
+ CountDownLatch completionLatch = new CountDownLatch(threads);
+ AtomicReference<Throwable> error = new AtomicReference<>();
+ publisherDropSeen.set(false);
+
+ for (int i = 0; i < threads; i++) {
+ executor.submit(() -> {
+ try {
+ barrier.await();
+ MessageId msgId = producer.send(msgData);
+ // Publisher drop is signaled by
MessageIdImpl.entryId == -1
+ if (msgId instanceof MessageIdImpl &&
((MessageIdImpl) msgId).getEntryId() == -1) {
+ publisherDropSeen.set(true);
+ }
+ } catch (Throwable t) {
+ if (t instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ error.compareAndSet(null, t);
+ } finally {
+ completionLatch.countDown();
}
+ });
+ }
- Thread.sleep(10);
- } catch (PulsarClientException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- });
- }
- assertTrue(latch.await(5, TimeUnit.SECONDS));
+ // Wait for all sends to complete.
+ completionLatch.await();
Review Comment:
Agreed. updated in bfcd6965f7e4f562dac148068fda724f05aed0f8
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]