codelipenghui commented on code in PR #16679:
URL: https://github.com/apache/pulsar/pull/16679#discussion_r924315439


##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -408,6 +412,110 @@ public Object answer(InvocationOnMock invocation) throws 
Throwable {
         orderedExecutor.shutdown();
     }
 
+    /**
+     * The use of {@link 
ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} 
for timed
+     * tasks in the original implementation caused this problem:
+     *   When the writer thread processes slowly, the scheduleAtFixedRate task 
will continue to append tasks to the
+     *   ledger thread, this burdens the ledger thread and leads to an 
avalanche.
+     * This method is used to verify the fix for the above problem. see: 
https://github.com/apache/pulsar/pull/16679.
+     */
+    @Test
+    public void testPendingScheduleTriggerTaskCount() throws Exception {
+        // Create components.
+        String managedLedgerName = "-";
+        ManagedLedger managedLedger = Mockito.mock(ManagedLedger.class);
+        Mockito.when(managedLedger.getName()).thenReturn(managedLedgerName);
+        OrderedExecutor orderedExecutor =  Mockito.mock(OrderedExecutor.class);
+        ArrayBlockingQueue<Runnable> workQueue = new 
ArrayBlockingQueue<>(65536 * 2);
+        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 
5, TimeUnit.SECONDS, workQueue);
+        
Mockito.when(orderedExecutor.chooseThread(Mockito.anyString())).thenReturn(threadPoolExecutor);
+        ScheduledExecutorService scheduledExecutorService =
+                Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("tx-scheduler-threads"));
+        SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+        // Count the number of tasks that have been submitted to bookie for 
later validation.
+        AtomicInteger completeFlushTaskCounter = new AtomicInteger();
+        Mockito.doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable 
{
+                completeFlushTaskCounter.incrementAndGet();
+                ByteBuf byteBuf = (ByteBuf)invocation.getArguments()[0];
+                byteBuf.skipBytes(4);
+                AsyncCallbacks.AddEntryCallback callback =
+                        (AsyncCallbacks.AddEntryCallback) 
invocation.getArguments()[1];
+                callback.addComplete(PositionImpl.get(1,1), byteBuf,
+                        invocation.getArguments()[2]);
+                return null;
+            }
+        }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class), 
Mockito.any(), Mockito.any());
+        // Start tests.
+        TxnLogBufferedWriter txnLogBufferedWriter = new 
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,

Review Comment:
   Close the write after test.



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -137,14 +139,35 @@ public TxnLogBufferedWriter(ManagedLedger managedLedger, 
OrderedExecutor ordered
         this.batchedWriteMaxDelayInMillis = batchedWriteMaxDelayInMillis;
         this.flushContext = FlushContext.newInstance();
         this.dataArray = new ArrayList<>();
+        this.scheduledExecutorService = scheduledExecutorService;
         // scheduler task.
-        if (batchEnabled) {
-            this.scheduledFuture = 
scheduledExecutorService.scheduleAtFixedRate(() -> trigFlush(false),
-                    batchedWriteMaxDelayInMillis, 
batchedWriteMaxDelayInMillis, TimeUnit.MICROSECONDS);
+        if (this.batchEnabled) {
+            nextTimingTrigger();
         }
         this.state = State.OPEN;
     }
 
+    /***
+     * Why not use {@link 
ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} ?
+     * Because: when the {@link #singleThreadExecutorForWrite} thread 
processes slowly, the scheduleAtFixedRate task
+     * will continue to append tasks to the ledger thread, this burdens the 
ledger thread and leads to an avalanche.
+     * see: https://github.com/apache/pulsar/pull/16679.
+     */
+    private void nextTimingTrigger(){
+        try {
+            if (state == State.CLOSING || state == State.CLOSED){
+                return;
+            }
+            scheduledFuture = scheduledExecutorService.schedule(() -> 
trigFlush(false, true),
+                    batchedWriteMaxDelayInMillis, TimeUnit.MICROSECONDS);

Review Comment:
   ```suggestion
               scheduledFuture = scheduledExecutorService.schedule(() -> 
trigFlush(false, true),
                       batchedWriteMaxDelayInMillis, TimeUnit.MILLISECONDS);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to