Cyrill commented on code in PR #3161: URL: https://github.com/apache/ignite-3/pull/3161#discussion_r1481000020
########## modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java: ########## @@ -771,6 +784,153 @@ public void testPrimaryFailureWhileInflightInProgressAfterFirstResponse() throws assertNull(rec); } + @Test + public void testTsRecoveryForCursor() throws Exception { + TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME); + + RecordView view1 = node(0).tables().table(TABLE_NAME).recordView(); + + for (int i = 0; i < 10; i++) { + view1.upsert(null, Tuple.create().set("key", i).set("val", "preload")); + } + + var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0); + + String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp); + + IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder); + + log.info("Transaction commit partition is determined [node={}].", commitPartNode.name()); + + IgniteImpl txCrdNode = nonPrimaryNode(leaseholder); + + log.info("Transaction coordinator is chosen [node={}].", txCrdNode.name()); + + UUID orphanTxId = startTransactionWithCursorAndStopNode(txCrdNode); + + IgniteImpl newCoordNode = node(0); + + log.info("New transaction coordinator is chosen [node={}].", newCoordNode.name()); + + CompletableFuture<Void> txMsgCaptureFut = new CompletableFuture<>(); + + commitPartNode.dropMessages((nodeName, msg) -> { + if (msg instanceof TxRecoveryMessage) { + txMsgCaptureFut.complete(null); + } + + return false; + }); + + Transaction tx = newCoordNode.transactions().begin(); + + RecordView view = newCoordNode.tables().table(TABLE_NAME).recordView(); + + var opFut = view.upsertAsync(tx, Tuple.create().set("key", 42).set("val", "new")); + + try { + opFut.get(); + } catch (Exception ex) { + log.info("Expected conflict that have to start recovery: " + ex.getMessage()); + } + + assertThat(txMsgCaptureFut, willCompleteSuccessfully()); + } + + private UUID startTransactionWithCursorAndStopNode(IgniteImpl txCrdNode) throws Exception { + InternalTransaction rwTx = (InternalTransaction) txCrdNode.transactions().begin(); + + scanSingleEntryAndLeaveCursorOpen((TableViewInternal) txCrdNode.tables().table(TABLE_NAME), rwTx, null, null); + + String txCrdNodeId = txCrdNode.id(); + + txCrdNode.stop(); + + assertTrue(waitForCondition( + () -> node(0).clusterNodes().stream().filter(n -> txCrdNodeId.equals(n.id())).count() == 0, + 10_000) + ); + + return rwTx.id(); + } + + /** + * Starts a scan procedure for a specific transaction and reads only the first line from the cursor. + * + * @param tbl Scanned table. + * @param tx Transaction. + * @param idxId Index id. + * @throws Exception If failed. + */ + private void scanSingleEntryAndLeaveCursorOpen(TableViewInternal tbl, InternalTransaction tx, Integer idxId, BinaryTuple exactKey) Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org