JAkutenshi commented on code in PR #6004: URL: https://github.com/apache/ignite-3/pull/6004#discussion_r2137574561
########## modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java: ########## @@ -565,6 +609,10 @@ public ClusterNode getByConsistentId(String consistentId) { placementDriver = spy(new TestPlacementDriver(localNode)); + ZonePartitionId zonePartitionId = new ZonePartitionId(tableDescriptor.zoneId(), PART_ID); + + FailureManager failureHandler = new NoOpFailureManager(); Review Comment: Fair, renamed. ########## modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java: ########## @@ -1275,6 +1323,492 @@ void exceptionIsReturnedIfTableProcessorIsAbsent() { assertThat(ex.getMessage(), is("Table is already destroyed [tableId=1]")); } + @Test + public void testTxStateReplicaRequestEmptyState() throws Exception { + doAnswer(invocation -> { + UUID txId = invocation.getArgument(5); + + txManager.updateTxMeta(txId, old -> new TxStateMeta( + ABORTED, + localNode.id(), + commitPartitionId, + null, + null, + null + )); + + return nullCompletedFuture(); + }).when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(), any(), any()); + + CompletableFuture<ReplicaResult> fut = zonePartitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest() + .groupId(tablePartitionIdMessage(grpId)) + .txId(newTxId()) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .build(), randomUUID()); + + assertThat(fut, willSucceedFast()); + + TransactionMeta txMeta = (TransactionMeta) fut.get().result(); + + assertNotNull(txMeta); + + assertEquals(ABORTED, txMeta.txState()); + } + + @Test + public void testTxStateReplicaRequestCommitState() throws Exception { + UUID txId = newTxId(); + + txStateStorage.putForRebalance(txId, new TxMeta(COMMITTED, singletonList(new EnlistedPartitionGroup(grpId)), clock.now())); + + HybridTimestamp readTimestamp = clock.now(); + + CompletableFuture<ReplicaResult> fut = zonePartitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest() + .groupId(tablePartitionIdMessage(grpId)) + .txId(txId) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .build(), localNode.id()); + + assertThat(fut, willSucceedFast()); + + TransactionMeta txMeta = (TransactionMeta) fut.get().result(); + + assertNotNull(txMeta); + assertEquals(COMMITTED, txMeta.txState()); + assertNotNull(txMeta.commitTimestamp()); + assertTrue(readTimestamp.compareTo(txMeta.commitTimestamp()) > 0); + } + + @Test + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "true") + public void testWriteIntentOnPrimaryReplicaSingleUpdate() { + zonePartitionReplicaListener.addTableReplicaProcessor(TABLE_ID, mocked -> tableReplicaProcessor); + + UUID txId = newTxId(); + AtomicInteger counter = new AtomicInteger(); + + testWriteIntentOnPrimaryReplica( + txId, + () -> { + BinaryRow binaryRow = binaryRow(counter.getAndIncrement()); + + return TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableId(TABLE_ID) + .transactionId(txId) + .requestType(RW_INSERT) + .schemaVersion(binaryRow.schemaVersion()) + .binaryTuple(binaryRow.tupleSlice()) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .commitPartitionId(commitPartitionId()) + .coordinatorId(localNode.id()) + .timestamp(clock.now()) + .build(); + }, + () -> checkRowInMvStorage(binaryRow(0), true) + ); + + cleanup(txId); + + zonePartitionReplicaListener.removeTableReplicaProcessor(TABLE_ID); + } + + + @Test + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "true") + public void testWriteIntentOnPrimaryReplicaUpdateAll() { + zonePartitionReplicaListener.addTableReplicaProcessor(TABLE_ID, mocked -> tableReplicaProcessor); + + UUID txId = newTxId(); + AtomicInteger counter = new AtomicInteger(); + + testWriteIntentOnPrimaryReplica( + txId, + () -> { + int cntr = counter.getAndIncrement(); + BinaryRow binaryRow0 = binaryRow(cntr * 2); + BinaryRow binaryRow1 = binaryRow(cntr * 2 + 1); + + return TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableId(TABLE_ID) + .transactionId(txId) + .requestType(RW_UPSERT_ALL) + .schemaVersion(binaryRow0.schemaVersion()) + .binaryTuples(asList(binaryRow0.tupleSlice(), binaryRow1.tupleSlice())) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .commitPartitionId(commitPartitionId()) + .coordinatorId(localNode.id()) + .timestamp(clock.now()) + .build(); + }, + () -> checkRowInMvStorage(binaryRow(0), true) + ); + + cleanup(txId); + + zonePartitionReplicaListener.removeTableReplicaProcessor(TABLE_ID); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "true") + void writeIntentSwitchForCompactedCatalogTimestampWorks(boolean commit) { + int earliestVersion = 999; + Catalog mockEarliestCatalog = mock(Catalog.class); + when(mockEarliestCatalog.version()).thenReturn(earliestVersion); + + UUID txId = newTxId(); + HybridTimestamp beginTs = beginTimestamp(txId); + HybridTimestamp commitTs = clock.now(); + + HybridTimestamp reliableCatalogVersionTs = commit ? commitTs : beginTs; + when(catalogService.activeCatalog(reliableCatalogVersionTs.longValue())).thenThrow(new CatalogNotFoundException("Oops")); + when(catalogService.earliestCatalog()).thenReturn(mockEarliestCatalog); + + zonePartitionReplicaListener.addTableReplicaProcessor(TABLE_ID, mocked -> tableReplicaProcessor); + + CompletableFuture<ReplicaResult> invokeFuture = zonePartitionReplicaListener.invoke( + TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableIds(Set.of(grpId.tableId())) + .txId(txId) + .commit(commit) + .commitTimestamp(commit ? commitTs : null) + .build(), + localNode.id() + ); + + assertThat(invokeFuture, willCompleteSuccessfully()); + assertThat(invokeFuture.join().applyResult().replicationFuture(), willCompleteSuccessfully()); + + verify(mockRaftClient).run(commandCaptor.capture()); + WriteIntentSwitchCommand command = (WriteIntentSwitchCommand) commandCaptor.getValue(); + + assertThat(command.requiredCatalogVersion(), is(earliestVersion)); + + zonePartitionReplicaListener.removeTableReplicaProcessor(TABLE_ID); + } + + Review Comment: Removed. ########## modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java: ########## @@ -1275,6 +1323,492 @@ void exceptionIsReturnedIfTableProcessorIsAbsent() { assertThat(ex.getMessage(), is("Table is already destroyed [tableId=1]")); } + @Test + public void testTxStateReplicaRequestEmptyState() throws Exception { + doAnswer(invocation -> { + UUID txId = invocation.getArgument(5); + + txManager.updateTxMeta(txId, old -> new TxStateMeta( + ABORTED, + localNode.id(), + commitPartitionId, + null, + null, + null + )); + + return nullCompletedFuture(); + }).when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(), any(), any()); + + CompletableFuture<ReplicaResult> fut = zonePartitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest() + .groupId(tablePartitionIdMessage(grpId)) + .txId(newTxId()) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .build(), randomUUID()); + + assertThat(fut, willSucceedFast()); + + TransactionMeta txMeta = (TransactionMeta) fut.get().result(); + + assertNotNull(txMeta); + + assertEquals(ABORTED, txMeta.txState()); + } + + @Test + public void testTxStateReplicaRequestCommitState() throws Exception { + UUID txId = newTxId(); + + txStateStorage.putForRebalance(txId, new TxMeta(COMMITTED, singletonList(new EnlistedPartitionGroup(grpId)), clock.now())); + + HybridTimestamp readTimestamp = clock.now(); + + CompletableFuture<ReplicaResult> fut = zonePartitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest() + .groupId(tablePartitionIdMessage(grpId)) + .txId(txId) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .build(), localNode.id()); + + assertThat(fut, willSucceedFast()); + + TransactionMeta txMeta = (TransactionMeta) fut.get().result(); + + assertNotNull(txMeta); + assertEquals(COMMITTED, txMeta.txState()); + assertNotNull(txMeta.commitTimestamp()); + assertTrue(readTimestamp.compareTo(txMeta.commitTimestamp()) > 0); + } + + @Test + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "true") + public void testWriteIntentOnPrimaryReplicaSingleUpdate() { + zonePartitionReplicaListener.addTableReplicaProcessor(TABLE_ID, mocked -> tableReplicaProcessor); + + UUID txId = newTxId(); + AtomicInteger counter = new AtomicInteger(); + + testWriteIntentOnPrimaryReplica( + txId, + () -> { + BinaryRow binaryRow = binaryRow(counter.getAndIncrement()); + + return TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableId(TABLE_ID) + .transactionId(txId) + .requestType(RW_INSERT) + .schemaVersion(binaryRow.schemaVersion()) + .binaryTuple(binaryRow.tupleSlice()) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .commitPartitionId(commitPartitionId()) + .coordinatorId(localNode.id()) + .timestamp(clock.now()) + .build(); + }, + () -> checkRowInMvStorage(binaryRow(0), true) + ); + + cleanup(txId); + + zonePartitionReplicaListener.removeTableReplicaProcessor(TABLE_ID); + } + + + @Test + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "true") + public void testWriteIntentOnPrimaryReplicaUpdateAll() { + zonePartitionReplicaListener.addTableReplicaProcessor(TABLE_ID, mocked -> tableReplicaProcessor); + + UUID txId = newTxId(); + AtomicInteger counter = new AtomicInteger(); + + testWriteIntentOnPrimaryReplica( + txId, + () -> { + int cntr = counter.getAndIncrement(); + BinaryRow binaryRow0 = binaryRow(cntr * 2); + BinaryRow binaryRow1 = binaryRow(cntr * 2 + 1); + + return TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableId(TABLE_ID) + .transactionId(txId) + .requestType(RW_UPSERT_ALL) + .schemaVersion(binaryRow0.schemaVersion()) + .binaryTuples(asList(binaryRow0.tupleSlice(), binaryRow1.tupleSlice())) + .enlistmentConsistencyToken(ANY_ENLISTMENT_CONSISTENCY_TOKEN) + .commitPartitionId(commitPartitionId()) + .coordinatorId(localNode.id()) + .timestamp(clock.now()) + .build(); + }, + () -> checkRowInMvStorage(binaryRow(0), true) + ); + + cleanup(txId); + + zonePartitionReplicaListener.removeTableReplicaProcessor(TABLE_ID); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @WithSystemProperty(key = COLOCATION_FEATURE_FLAG, value = "true") + void writeIntentSwitchForCompactedCatalogTimestampWorks(boolean commit) { + int earliestVersion = 999; + Catalog mockEarliestCatalog = mock(Catalog.class); + when(mockEarliestCatalog.version()).thenReturn(earliestVersion); + + UUID txId = newTxId(); + HybridTimestamp beginTs = beginTimestamp(txId); + HybridTimestamp commitTs = clock.now(); + + HybridTimestamp reliableCatalogVersionTs = commit ? commitTs : beginTs; + when(catalogService.activeCatalog(reliableCatalogVersionTs.longValue())).thenThrow(new CatalogNotFoundException("Oops")); + when(catalogService.earliestCatalog()).thenReturn(mockEarliestCatalog); + + zonePartitionReplicaListener.addTableReplicaProcessor(TABLE_ID, mocked -> tableReplicaProcessor); + + CompletableFuture<ReplicaResult> invokeFuture = zonePartitionReplicaListener.invoke( + TX_MESSAGES_FACTORY.writeIntentSwitchReplicaRequest() + .groupId(tablePartitionIdMessage(grpId)) + .tableIds(Set.of(grpId.tableId())) + .txId(txId) + .commit(commit) + .commitTimestamp(commit ? commitTs : null) + .build(), + localNode.id() + ); + + assertThat(invokeFuture, willCompleteSuccessfully()); + assertThat(invokeFuture.join().applyResult().replicationFuture(), willCompleteSuccessfully()); + + verify(mockRaftClient).run(commandCaptor.capture()); + WriteIntentSwitchCommand command = (WriteIntentSwitchCommand) commandCaptor.getValue(); + + assertThat(command.requiredCatalogVersion(), is(earliestVersion)); + + zonePartitionReplicaListener.removeTableReplicaProcessor(TABLE_ID); + } + + + @Test + public void abortsSuccessfully() { + AtomicReference<Boolean> committed = interceptFinishTxCommand(); + + CompletableFuture<?> future = beginAndAbortTx(); + + assertThat(future, willSucceedFast()); + + assertThat(committed.get(), is(false)); + } + + @Test + public void commitsOnSameSchemaSuccessfully() { + when(validationSchemasSource.tableSchemaVersionsBetween(anyInt(), any(), any(HybridTimestamp.class))) + .thenReturn(List.of( + tableSchema(CURRENT_SCHEMA_VERSION, List.of(nullableColumn("col"))) + )); + + AtomicReference<Boolean> committed = interceptFinishTxCommand(); + + CompletableFuture<?> future = beginAndCommitTx(); + + assertThat(future, willSucceedFast()); + + assertThat(committed.get(), is(true)); + } + + Review Comment: And removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org