JAkutenshi commented on code in PR #6004:
URL: https://github.com/apache/ignite-3/pull/6004#discussion_r2137575078


##########
modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java:
##########
@@ -1275,6 +1323,492 @@ void exceptionIsReturnedIfTableProcessorIsAbsent() {
         assertThat(ex.getMessage(), is("Table is already destroyed 
[tableId=1]"));
     }
 
+    @Test
+    public void testTxStateReplicaRequestEmptyState() throws Exception {
+        doAnswer(invocation -> {
+            UUID txId = invocation.getArgument(5);
+
+            txManager.updateTxMeta(txId, old -> new TxStateMeta(
+                    ABORTED,
+                    localNode.id(),
+                    commitPartitionId,
+                    null,
+                    null,
+                    null
+            ));
+
+            return nullCompletedFuture();
+        }).when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(), 
any(), any());
+
+        CompletableFuture<ReplicaResult> fut = 
zonePartitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
+                .groupId(tablePartitionIdMessage(grpId))
+                .txId(newTxId())
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
+                .build(), randomUUID());
+
+        assertThat(fut, willSucceedFast());
+
+        TransactionMeta txMeta = (TransactionMeta) fut.get().result();
+
+        assertNotNull(txMeta);
+
+        assertEquals(ABORTED, txMeta.txState());
+    }
+
+    @Test
+    public void testTxStateReplicaRequestCommitState() throws Exception {
+        UUID txId = newTxId();
+
+        txStateStorage.putForRebalance(txId, new TxMeta(COMMITTED, 
singletonList(new EnlistedPartitionGroup(grpId)), clock.now()));
+
+        HybridTimestamp readTimestamp = clock.now();
+
+        CompletableFuture<ReplicaResult> fut = 
zonePartitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
+                .groupId(tablePartitionIdMessage(grpId))
+                .txId(txId)
+                .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
+                .build(), localNode.id());
+
+        assertThat(fut, willSucceedFast());
+
+        TransactionMeta txMeta = (TransactionMeta) fut.get().result();
+
+        assertNotNull(txMeta);
+        assertEquals(COMMITTED, txMeta.txState());
+        assertNotNull(txMeta.commitTimestamp());
+        assertTrue(readTimestamp.compareTo(txMeta.commitTimestamp()) > 0);
+    }
+
+    @Test
+    @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "true")
+    public void testWriteIntentOnPrimaryReplicaSingleUpdate() {
+        zonePartitionReplicaListener.addTableReplicaProcessor(TABLE_ID, mocked 
-> tableReplicaProcessor);
+
+        UUID txId = newTxId();
+        AtomicInteger counter = new AtomicInteger();
+
+        testWriteIntentOnPrimaryReplica(
+                txId,
+                () -> {
+                    BinaryRow binaryRow = binaryRow(counter.getAndIncrement());
+
+                    return 
TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
+                            .groupId(tablePartitionIdMessage(grpId))
+                            .tableId(TABLE_ID)
+                            .transactionId(txId)
+                            .requestType(RW_INSERT)
+                            .schemaVersion(binaryRow.schemaVersion())
+                            .binaryTuple(binaryRow.tupleSlice())
+                            
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
+                            .commitPartitionId(commitPartitionId())
+                            .coordinatorId(localNode.id())
+                            .timestamp(clock.now())
+                            .build();
+                },
+                () -> checkRowInMvStorage(binaryRow(0), true)
+        );
+
+        cleanup(txId);
+
+        zonePartitionReplicaListener.removeTableReplicaProcessor(TABLE_ID);
+    }
+
+
+    @Test
+    @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "true")
+    public void testWriteIntentOnPrimaryReplicaUpdateAll() {
+        zonePartitionReplicaListener.addTableReplicaProcessor(TABLE_ID, mocked 
-> tableReplicaProcessor);
+
+        UUID txId = newTxId();
+        AtomicInteger counter = new AtomicInteger();
+
+        testWriteIntentOnPrimaryReplica(
+                txId,
+                () -> {
+                    int cntr = counter.getAndIncrement();
+                    BinaryRow binaryRow0 = binaryRow(cntr * 2);
+                    BinaryRow binaryRow1 = binaryRow(cntr * 2 + 1);
+
+                    return 
TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
+                            .groupId(tablePartitionIdMessage(grpId))
+                            .tableId(TABLE_ID)
+                            .transactionId(txId)
+                            .requestType(RW_UPSERT_ALL)
+                            .schemaVersion(binaryRow0.schemaVersion())
+                            .binaryTuples(asList(binaryRow0.tupleSlice(), 
binaryRow1.tupleSlice()))
+                            
.enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN)
+                            .commitPartitionId(commitPartitionId())
+                            .coordinatorId(localNode.id())
+                            .timestamp(clock.now())
+                            .build();
+                },
+                () -> checkRowInMvStorage(binaryRow(0), true)
+        );
+
+        cleanup(txId);
+
+        zonePartitionReplicaListener.removeTableReplicaProcessor(TABLE_ID);
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "true")
+    void writeIntentSwitchForCompactedCatalogTimestampWorks(boolean commit) {
+        int earliestVersion = 999;
+        Catalog mockEarliestCatalog = mock(Catalog.class);
+        when(mockEarliestCatalog.version()).thenReturn(earliestVersion);
+
+        UUID txId = newTxId();
+        HybridTimestamp beginTs = beginTimestamp(txId);
+        HybridTimestamp commitTs = clock.now();
+
+        HybridTimestamp reliableCatalogVersionTs = commit ? commitTs : beginTs;
+        
when(catalogService.activeCatalog(reliableCatalogVersionTs.longValue())).thenThrow(new
 CatalogNotFoundException("Oops"));
+        when(catalogService.earliestCatalog()).thenReturn(mockEarliestCatalog);
+
+        zonePartitionReplicaListener.addTableReplicaProcessor(TABLE_ID, mocked 
-> tableReplicaProcessor);
+
+        CompletableFuture<ReplicaResult> invokeFuture = 
zonePartitionReplicaListener.invoke(
+                TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest()
+                        .groupId(tablePartitionIdMessage(grpId))
+                        .tableIds(Set.of(grpId.tableId()))
+                        .txId(txId)
+                        .commit(commit)
+                        .commitTimestamp(commit ? commitTs : null)
+                        .build(),
+                localNode.id()
+        );
+
+        assertThat(invokeFuture, willCompleteSuccessfully());
+        assertThat(invokeFuture.join().applyResult().replicationFuture(), 
willCompleteSuccessfully());
+
+        verify(mockRaftClient).run(commandCaptor.capture());
+        WriteIntentSwitchCommand command = (WriteIntentSwitchCommand) 
commandCaptor.getValue();
+
+        assertThat(command.requiredCatalogVersion(), is(earliestVersion));
+
+        zonePartitionReplicaListener.removeTableReplicaProcessor(TABLE_ID);
+    }
+
+
+    @Test
+    public void abortsSuccessfully() {
+        AtomicReference<Boolean> committed = interceptFinishTxCommand();
+
+        CompletableFuture<?> future = beginAndAbortTx();
+
+        assertThat(future, willSucceedFast());
+
+        assertThat(committed.get(), is(false));
+    }
+
+    @Test
+    public void commitsOnSameSchemaSuccessfully() {
+        when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), 
any(), any(HybridTimestamp.class)))
+                .thenReturn(List.of(
+                        tableSchema(CURRENT_SCHEMA_VERSION, 
List.of(nullableColumn("col")))
+                ));
+
+        AtomicReference<Boolean> committed = interceptFinishTxCommand();
+
+        CompletableFuture<?> future = beginAndCommitTx();
+
+        assertThat(future, willSucceedFast());
+
+        assertThat(committed.get(), is(true));
+    }
+
+

Review Comment:
   And removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to