liangyepianzhou commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r925127257
##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java:
##########
@@ -94,17 +97,23 @@ public void testManagedLedgerMetrics() throws Exception {
@Test
public void testTransactionTopic() throws Exception {
+ TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new
TxnLogBufferedWriterConfig();
+ txnLogBufferedWriterConfig.setBatchEnabled(true);
+ ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(2);
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"),
Sets.newHashSet("test")));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
createTransactionCoordinatorAssign();
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(2);
- new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
- pulsar.getManagedLedgerFactory(), managedLedgerConfig)
- .initialize().join();
+ MLTransactionLogImpl mlTransactionLog = new
MLTransactionLogImpl(TransactionCoordinatorID.get(0),
+ pulsar.getManagedLedgerFactory(), managedLedgerConfig,
txnLogBufferedWriterConfig,
+ scheduledExecutorService);
+ mlTransactionLog.initialize().join();
ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar);
metrics.generate();
+ // cleanup.
+ mlTransactionLog.closeAsync();
Review Comment:
```suggestion
mlTransactionLog.closeAsync();
scheduledExecutorService.shutdown();
```
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java:
##########
@@ -247,8 +247,11 @@ public CompletableFuture<TxnID> newTransaction(long
timeOut) {
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
.setLastModificationTime(currentTimeMillis)
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+ String id = UUID.randomUUID().toString();
Review Comment:
```suggestion
```
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java:
##########
@@ -247,8 +247,11 @@ public CompletableFuture<TxnID> newTransaction(long
timeOut) {
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
.setLastModificationTime(currentTimeMillis)
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+ String id = UUID.randomUUID().toString();
+ System.out.println(id + " start");
transactionLog.append(transactionMetadataEntry)
.whenComplete((position, throwable) -> {
+ System.out.println(id + " end");
Review Comment:
```suggestion
if (log.isDebugEnabled()) {
log.debug("Transaction coordinator [{}]
complete to open transaction [{}]",
txnID.getMostSigBits(),
txnID.getLeastSigBits());
}
```
##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java:
##########
@@ -247,8 +247,11 @@ public CompletableFuture<TxnID> newTransaction(long
timeOut) {
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
.setLastModificationTime(currentTimeMillis)
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+ String id = UUID.randomUUID().toString();
+ System.out.println(id + " start");
Review Comment:
```suggestion
if (log.isDebugEnabled()) {
log.debug("Transaction coordinator [{}] start to open
transaction [{}]",
txnID.getMostSigBits(), txnID.getLeastSigBits());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]