Cyrill commented on code in PR #3161:
URL: https://github.com/apache/ignite-3/pull/3161#discussion_r1481000020


##########
modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java:
##########
@@ -771,6 +784,153 @@ public void 
testPrimaryFailureWhileInflightInProgressAfterFirstResponse() throws
         assertNull(rec);
     }
 
+    @Test
+    public void testTsRecoveryForCursor() throws Exception {
+        TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
+
+        RecordView view1 = node(0).tables().table(TABLE_NAME).recordView();
+
+        for (int i = 0; i < 10; i++) {
+            view1.upsert(null, Tuple.create().set("key", i).set("val", 
"preload"));
+        }
+
+        var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
+
+        String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
+
+        IgniteImpl commitPartNode = commitPartitionPrimaryNode(leaseholder);
+
+        log.info("Transaction commit partition is determined [node={}].", 
commitPartNode.name());
+
+        IgniteImpl txCrdNode = nonPrimaryNode(leaseholder);
+
+        log.info("Transaction coordinator is chosen [node={}].", 
txCrdNode.name());
+
+        UUID orphanTxId = startTransactionWithCursorAndStopNode(txCrdNode);
+
+        IgniteImpl newCoordNode = node(0);
+
+        log.info("New transaction coordinator is chosen [node={}].", 
newCoordNode.name());
+
+        CompletableFuture<Void> txMsgCaptureFut = new CompletableFuture<>();
+
+        commitPartNode.dropMessages((nodeName, msg) -> {
+            if (msg instanceof TxRecoveryMessage) {
+                txMsgCaptureFut.complete(null);
+            }
+
+            return false;
+        });
+
+        Transaction tx = newCoordNode.transactions().begin();
+
+        RecordView view = newCoordNode.tables().table(TABLE_NAME).recordView();
+
+        var opFut = view.upsertAsync(tx, Tuple.create().set("key", 
42).set("val", "new"));
+
+        try {
+            opFut.get();
+        } catch (Exception ex) {
+            log.info("Expected conflict that have to start recovery: " + 
ex.getMessage());
+        }
+
+        assertThat(txMsgCaptureFut, willCompleteSuccessfully());
+    }
+
+    private UUID startTransactionWithCursorAndStopNode(IgniteImpl txCrdNode) 
throws Exception {
+        InternalTransaction rwTx = (InternalTransaction) 
txCrdNode.transactions().begin();
+
+        scanSingleEntryAndLeaveCursorOpen((TableViewInternal) 
txCrdNode.tables().table(TABLE_NAME), rwTx, null, null);
+
+        String txCrdNodeId = txCrdNode.id();
+
+        txCrdNode.stop();
+
+        assertTrue(waitForCondition(
+                () -> node(0).clusterNodes().stream().filter(n -> 
txCrdNodeId.equals(n.id())).count() == 0,
+                10_000)
+        );
+
+        return rwTx.id();
+    }
+
+    /**
+     * Starts a scan procedure for a specific transaction and reads only the 
first line from the cursor.
+     *
+     * @param tbl Scanned table.
+     * @param tx Transaction.
+     * @param idxId Index id.
+     * @throws Exception If failed.
+     */
+    private void scanSingleEntryAndLeaveCursorOpen(TableViewInternal tbl, 
InternalTransaction tx, Integer idxId, BinaryTuple exactKey)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to