codelipenghui commented on code in PR #16679:
URL: https://github.com/apache/pulsar/pull/16679#discussion_r924315439
##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -408,6 +412,110 @@ public Object answer(InvocationOnMock invocation) throws
Throwable {
orderedExecutor.shutdown();
}
+ /**
+ * The use of {@link
ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)}
for timed
+ * tasks in the original implementation caused this problem:
+ * When the writer thread processes slowly, the scheduleAtFixedRate task
will continue to append tasks to the
+ * ledger thread, this burdens the ledger thread and leads to an
avalanche.
+ * This method is used to verify the fix for the above problem. see:
https://github.com/apache/pulsar/pull/16679.
+ */
+ @Test
+ public void testPendingScheduleTriggerTaskCount() throws Exception {
+ // Create components.
+ String managedLedgerName = "-";
+ ManagedLedger managedLedger = Mockito.mock(ManagedLedger.class);
+ Mockito.when(managedLedger.getName()).thenReturn(managedLedgerName);
+ OrderedExecutor orderedExecutor = Mockito.mock(OrderedExecutor.class);
+ ArrayBlockingQueue<Runnable> workQueue = new
ArrayBlockingQueue<>(65536 * 2);
+ ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
5, TimeUnit.SECONDS, workQueue);
+
Mockito.when(orderedExecutor.chooseThread(Mockito.anyString())).thenReturn(threadPoolExecutor);
+ ScheduledExecutorService scheduledExecutorService =
+ Executors.newSingleThreadScheduledExecutor(new
DefaultThreadFactory("tx-scheduler-threads"));
+ SumStrDataSerializer dataSerializer = new SumStrDataSerializer();
+ // Count the number of tasks that have been submitted to bookie for
later validation.
+ AtomicInteger completeFlushTaskCounter = new AtomicInteger();
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
{
+ completeFlushTaskCounter.incrementAndGet();
+ ByteBuf byteBuf = (ByteBuf)invocation.getArguments()[0];
+ byteBuf.skipBytes(4);
+ AsyncCallbacks.AddEntryCallback callback =
+ (AsyncCallbacks.AddEntryCallback)
invocation.getArguments()[1];
+ callback.addComplete(PositionImpl.get(1,1), byteBuf,
+ invocation.getArguments()[2]);
+ return null;
+ }
+ }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class),
Mockito.any(), Mockito.any());
+ // Start tests.
+ TxnLogBufferedWriter txnLogBufferedWriter = new
TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
Review Comment:
Close the write after test.
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -137,14 +139,35 @@ public TxnLogBufferedWriter(ManagedLedger managedLedger,
OrderedExecutor ordered
this.batchedWriteMaxDelayInMillis = batchedWriteMaxDelayInMillis;
this.flushContext = FlushContext.newInstance();
this.dataArray = new ArrayList<>();
+ this.scheduledExecutorService = scheduledExecutorService;
// scheduler task.
- if (batchEnabled) {
- this.scheduledFuture =
scheduledExecutorService.scheduleAtFixedRate(() -> trigFlush(false),
- batchedWriteMaxDelayInMillis,
batchedWriteMaxDelayInMillis, TimeUnit.MICROSECONDS);
+ if (this.batchEnabled) {
+ nextTimingTrigger();
}
this.state = State.OPEN;
}
+ /***
+ * Why not use {@link
ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} ?
+ * Because: when the {@link #singleThreadExecutorForWrite} thread
processes slowly, the scheduleAtFixedRate task
+ * will continue to append tasks to the ledger thread, this burdens the
ledger thread and leads to an avalanche.
+ * see: https://github.com/apache/pulsar/pull/16679.
+ */
+ private void nextTimingTrigger(){
+ try {
+ if (state == State.CLOSING || state == State.CLOSED){
+ return;
+ }
+ scheduledFuture = scheduledExecutorService.schedule(() ->
trigFlush(false, true),
+ batchedWriteMaxDelayInMillis, TimeUnit.MICROSECONDS);
Review Comment:
```suggestion
scheduledFuture = scheduledExecutorService.schedule(() ->
trigFlush(false, true),
batchedWriteMaxDelayInMillis, TimeUnit.MILLISECONDS);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]