codelipenghui commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926344917


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -66,13 +74,24 @@ public class MLTransactionLogImpl implements TransactionLog 
{
 
     private final TopicName topicName;
 
+    private TxnLogBufferedWriter<TransactionMetadataEntry> bufferedWriter;
+
+    private final ScheduledExecutorService scheduledExecutorService;
+
+    private final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig;
+
     public MLTransactionLogImpl(TransactionCoordinatorID tcID,
                                 ManagedLedgerFactory managedLedgerFactory,
-                                ManagedLedgerConfig managedLedgerConfig) {
+                                ManagedLedgerConfig managedLedgerConfig,
+                                TxnLogBufferedWriterConfig 
txnLogBufferedWriterConfig,
+                                ScheduledExecutorService 
scheduledExecutorService) {
         this.topicName = getMLTransactionLogName(tcID);
         this.tcId = tcID.getId();
         this.managedLedgerFactory = managedLedgerFactory;
         this.managedLedgerConfig = managedLedgerConfig;
+        this.scheduledExecutorService = scheduledExecutorService;
+        this.txnLogBufferedWriterConfig = txnLogBufferedWriterConfig;
+        this.managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(true);

Review Comment:
   We only need to enable the batch index level ack while enabling the 
transaction log batch.



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -233,6 +289,28 @@ public void start() {
         }
     }
 
+    public static List<TransactionMetadataEntry> deserializeEntry(ByteBuf 
buffer){

Review Comment:
   Do we need to release the buffer after deserializing?



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java:
##########
@@ -56,4 +61,19 @@ public boolean equals(Object o) {
     public int hashCode() {
         return Objects.hash(super.hashCode(), batchSize, batchIndex);
     }
+
+    /**
+     * Build the attribute ackSet to that {@link #batchIndex} is false and 
others is true.
+     */
+    public void deleteFromAckSet(){

Review Comment:
   ```suggestion
       void setAckSetByIndex()
   ```



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -233,6 +289,28 @@ public void start() {
         }
     }
 
+    public static List<TransactionMetadataEntry> deserializeEntry(ByteBuf 
buffer){
+        // Check whether it is batched Entry.
+        buffer.markReaderIndex();
+        short magicNum = buffer.readShort();
+        buffer.resetReaderIndex();
+        if (magicNum == BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER){
+            // skip version
+            buffer.skipBytes(4);
+            BatchedTransactionMetadataEntry batchedLog = new 
BatchedTransactionMetadataEntry();
+            batchedLog.parseFrom(buffer, buffer.readableBytes());
+            return batchedLog.getTransactionLogsList();
+        } else {
+            TransactionMetadataEntry log = new TransactionMetadataEntry();
+            log.parseFrom(buffer, buffer.readableBytes());
+            return Collections.singletonList(log);
+        }
+    }
+
+    public static List<TransactionMetadataEntry> deserializeEntry(Entry entry){
+        return deserializeEntry(entry.getDataBuffer());

Review Comment:
   Also need to consider the entry release.



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImpl.java:
##########
@@ -56,4 +61,19 @@ public boolean equals(Object o) {
     public int hashCode() {
         return Objects.hash(super.hashCode(), batchSize, batchIndex);
     }
+
+    /**
+     * Build the attribute ackSet to that {@link #batchIndex} is false and 
others is true.
+     */
+    public void deleteFromAckSet(){

Review Comment:
   And please add a test to cover the new method.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -343,6 +346,8 @@ public BrokerService(PulsarService pulsar, EventLoopGroup 
eventLoopGroup) throws
         this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
         this.backlogQuotaChecker = Executors
                 .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-backlog-quota-checker"));
+        this.transactionLogBufferedWriteAsyncFlushTrigger = Executors
+                .newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("pulsar-backlog-quota-checker"));

Review Comment:
   Can we reuse the `brokerClientSharedTimer` in the PulsarService? It's a 
little expensive to have a new scheduled executor with 1 millis tick time.



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -147,14 +175,10 @@ public void closeFailed(ManagedLedgerException exception, 
Object ctx) {
 
     @Override
     public CompletableFuture<Position> append(TransactionMetadataEntry 
transactionMetadataEntry) {
-        int transactionMetadataEntrySize = 
transactionMetadataEntry.getSerializedSize();
-        ByteBuf buf = 
PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, 
transactionMetadataEntrySize);
         CompletableFuture<Position> completableFuture = new 
CompletableFuture<>();
-        transactionMetadataEntry.writeTo(buf);
-        managedLedger.asyncAddEntry(buf, new AsyncCallbacks.AddEntryCallback() 
{
+        bufferedWriter.asyncAddData(transactionMetadataEntry, new 
TxnLogBufferedWriter.AddDataCallback() {

Review Comment:
   Please check if the is released after receiving the callback in 
TxnLogBufferedWriter.addComplete



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -233,6 +289,28 @@ public void start() {
         }
     }
 
+    public static List<TransactionMetadataEntry> deserializeEntry(ByteBuf 
buffer){
+        // Check whether it is batched Entry.
+        buffer.markReaderIndex();
+        short magicNum = buffer.readShort();
+        buffer.resetReaderIndex();
+        if (magicNum == BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER){
+            // skip version
+            buffer.skipBytes(4);

Review Comment:
   It does not only skip the version but also skip the magnum right? We should 
use constant here, e.g. 
buffer.skipBytes(BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER_LENGH + 
BATCHED_ENTRY_DATA_PREFIX_VERSION_LENGH)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to