congbobo184 commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r925483775


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java:
##########
@@ -283,4 +365,43 @@ public void readEntriesFailed(ManagedLedgerException 
exception, Object ctx) {
         }
 
     }
+
+    private static final FastThreadLocal<BatchedTransactionMetadataEntry> 
localBatchedTransactionLogCache =
+            new FastThreadLocal<>() {
+                @Override
+                protected BatchedTransactionMetadataEntry initialValue() 
throws Exception {
+                    return new BatchedTransactionMetadataEntry();
+                }
+            };
+
+    private static class TransactionLogDataSerializer
+            implements 
TxnLogBufferedWriter.DataSerializer<TransactionMetadataEntry>{
+
+        private static final TransactionLogDataSerializer INSTANCE = new 
TransactionLogDataSerializer();
+
+        @Override
+        public int getSerializedSize(TransactionMetadataEntry data) {
+            return data.getSerializedSize();
+        }
+
+        @Override
+        public ByteBuf serialize(TransactionMetadataEntry data) {
+            int transactionMetadataEntrySize = data.getSerializedSize();
+            ByteBuf buf =
+                    
PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, 
transactionMetadataEntrySize);
+            data.writeTo(buf);
+            return buf;
+        }
+
+        @Override
+        public ByteBuf serialize(ArrayList<TransactionMetadataEntry> 
transactionLogArray) {
+            // Since all writes are in the same thread, so we can use 
threadLocal variables here.
+            BatchedTransactionMetadataEntry data = 
localBatchedTransactionLogCache.get();
+            data.addAllTransactionLogs(transactionLogArray);

Review Comment:
   clear



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java:
##########
@@ -55,12 +60,19 @@
 
 public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
 
+    private ScheduledExecutorService scheduledExecutorService = 
Executors.newScheduledThreadPool(3);

Review Comment:
   1. It's better to add a single test that adds batch transaction log and then 
can open a new cursor for the transaction log managedLedger and read the batch 
log. then the TransactionCoordinator can recover successfully
   2. add a test for the log delete



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to