poorbarcode commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r926884869
##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnBatchedPositionImplTest.java:
##########
@@ -0,0 +1,38 @@
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import org.apache.pulsar.common.util.collections.BitSetRecyclable;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class TxnBatchedPositionImplTest {
+
+ @DataProvider(name = "batchSizeAndBatchIndexArgsArray")
+ private Object[][] batchSizeAndBatchIndexArgsArray(){
+ Object[][] args = new Object[5][];
+ args[0] = new Object[]{10, 5};
+ args[1] = new Object[]{64, 0};
+ args[2] = new Object[]{64, 63};
+ args[3] = new Object[]{230, 120};
+ args[4] = new Object[]{256, 255};
+ return args;
Review Comment:
Already fixed.
##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import
org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import static org.mockito.Mockito.*;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class MLTransactionLogImplTest extends MockedBookKeeperTestCase {
+
+ @DataProvider(name = "variedBufferedWriteConfigProvider")
+ private Object[][] variedBufferedWriteConfigProvider(){
+ Object[][] args = new Object[4][];
+ args[0] = new Object[]{true, true};
+ args[1] = new Object[]{false, false};
+ args[2] = new Object[]{true, false};
+ args[3] = new Object[]{false, true};
+ return args;
Review Comment:
Already fixed.
##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java:
##########
@@ -0,0 +1,91 @@
+package org.apache.pulsar.transaction.coordinator.impl;
+
+import java.util.Arrays;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
+import org.apache.pulsar.common.api.proto.Subscription;
+import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
+import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
+import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
+import
org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
+import org.apache.pulsar.transaction.coordinator.test.MockedBookKeeperTestCase;
+import static org.mockito.Mockito.*;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class MLTransactionLogImplTest extends MockedBookKeeperTestCase {
+
+ @DataProvider(name = "variedBufferedWriteConfigProvider")
+ private Object[][] variedBufferedWriteConfigProvider(){
+ Object[][] args = new Object[4][];
+ args[0] = new Object[]{true, true};
+ args[1] = new Object[]{false, false};
+ args[2] = new Object[]{true, false};
+ args[3] = new Object[]{false, true};
+ return args;
+ }
+
+ @Test(dataProvider = "variedBufferedWriteConfigProvider")
+ public void test1(boolean writeWithBatch, boolean readWithBatch) throws
Exception {
+ ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(2);
+ TxnLogBufferedWriterConfig bufferedWriterConfig = new
TxnLogBufferedWriterConfig();
+ TransactionCoordinatorID transactionCoordinatorID =
TransactionCoordinatorID.get(0);
+ MLTransactionLogImpl mlTransactionLog = new
MLTransactionLogImpl(TransactionCoordinatorID.get(0), factory,
+ new ManagedLedgerConfig(), bufferedWriterConfig,
scheduledExecutorService);
+ mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
+
+ // Add logs: start transaction.
+ for (int i = 1; i <= 30; i++){
+ TransactionMetadataEntry transactionLog = new
TransactionMetadataEntry();
+ transactionLog.setTxnidMostBits(i);
+ transactionLog.setTxnidLeastBits(i);
+ transactionLog.setMaxLocalTxnId(i);
+ transactionLog.setStartTime(i);
+ transactionLog.setTimeoutMs(i);
+
transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW);
+ mlTransactionLog.append(transactionLog);
+ }
+ // Add logs: add partition.
+ for (int i = 1; i <= 30; i++){
+ TransactionMetadataEntry transactionLog = new
TransactionMetadataEntry();
+ transactionLog.setTxnidLeastBits(i);
+ transactionLog.setMaxLocalTxnId(i);
+
transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.ADD_PARTITION);
+ transactionLog.addAllPartitions(Arrays.asList(String.valueOf(i)));
+ mlTransactionLog.append(transactionLog);
+ }
+ // Add logs: add subscription.
+ for (int i = 1; i <= 30; i++){
+ TransactionMetadataEntry transactionLog = new
TransactionMetadataEntry();
+ transactionLog.setTxnidLeastBits(i);
+ transactionLog.setMaxLocalTxnId(i);
+
transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.ADD_SUBSCRIPTION);
+ Subscription subscription = new Subscription();
+ subscription.setSubscription(String.valueOf(i));
+ subscription.setTopic(String.valueOf(i));
+ transactionLog.addAllSubscriptions(Arrays.asList(subscription));
+ mlTransactionLog.append(transactionLog);
+ }
+ // Add logs: commit.
+ for (int i = 1; i <= 30; i++){
+ TransactionMetadataEntry transactionLog = new
TransactionMetadataEntry();
+ transactionLog.setTxnidLeastBits(i);
+ transactionLog.setMaxLocalTxnId(i);
+
transactionLog.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.UPDATE);
+ mlTransactionLog.append(transactionLog);
+ }
+ // Verify the mapping of position.
+
+
+ // Verify recover correct.
+ TransactionTimeoutTracker timeoutTracker =
mock(TransactionTimeoutTracker.class);
+ MLTransactionSequenceIdGenerator sequenceIdGenerator =
mock(MLTransactionSequenceIdGenerator.class);
+ TransactionRecoverTracker recoverTracker =
mock(TransactionRecoverTracker.class);
+ // TODO 记录发送出去的请求
Review Comment:
Already fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]