liangyepianzhou commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r925127257


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java:
##########
@@ -94,17 +97,23 @@ public void testManagedLedgerMetrics() throws Exception {
 
     @Test
     public void testTransactionTopic() throws Exception {
+        TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new 
TxnLogBufferedWriterConfig();
+        txnLogBufferedWriterConfig.setBatchEnabled(true);
+        ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(2);
         
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
                 new TenantInfoImpl(Sets.newHashSet("appid1"), 
Sets.newHashSet("test")));
         
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
         createTransactionCoordinatorAssign();
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         managedLedgerConfig.setMaxEntriesPerLedger(2);
-        new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
-                pulsar.getManagedLedgerFactory(), managedLedgerConfig)
-                .initialize().join();
+        MLTransactionLogImpl mlTransactionLog = new 
MLTransactionLogImpl(TransactionCoordinatorID.get(0),
+                pulsar.getManagedLedgerFactory(), managedLedgerConfig, 
txnLogBufferedWriterConfig,
+                scheduledExecutorService);
+        mlTransactionLog.initialize().join();
         ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar);
         metrics.generate();
+        // cleanup.
+        mlTransactionLog.closeAsync();

Review Comment:
   ```suggestion
           mlTransactionLog.closeAsync();
           scheduledExecutorService.shutdown();
   ```



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java:
##########
@@ -247,8 +247,11 @@ public CompletableFuture<TxnID> newTransaction(long 
timeOut) {
                         
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
                         .setLastModificationTime(currentTimeMillis)
                         
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+                String id = UUID.randomUUID().toString();

Review Comment:
   ```suggestion
   
   ```



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java:
##########
@@ -247,8 +247,11 @@ public CompletableFuture<TxnID> newTransaction(long 
timeOut) {
                         
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
                         .setLastModificationTime(currentTimeMillis)
                         
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+                String id = UUID.randomUUID().toString();
+                System.out.println(id + " start");
                 transactionLog.append(transactionMetadataEntry)
                         .whenComplete((position, throwable) -> {
+                            System.out.println(id + " end");

Review Comment:
   ```suggestion
                            if (log.isDebugEnabled()) {
                                   log.debug("Transaction coordinator [{}] 
complete to open transaction [{}]",
                                           txnID.getMostSigBits(), 
txnID.getLeastSigBits());
                               }
   ```



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java:
##########
@@ -247,8 +247,11 @@ public CompletableFuture<TxnID> newTransaction(long 
timeOut) {
                         
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
                         .setLastModificationTime(currentTimeMillis)
                         
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+                String id = UUID.randomUUID().toString();
+                System.out.println(id + " start");

Review Comment:
   ```suggestion
                  if (log.isDebugEnabled()) {
                       log.debug("Transaction coordinator [{}] start to open 
transaction [{}]", 
                               txnID.getMostSigBits(), txnID.getLeastSigBits());
                   } 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to