ascherbakoff commented on code in PR #7588:
URL: https://github.com/apache/ignite-3/pull/7588#discussion_r2851691951


##########
modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java:
##########
@@ -206,6 +225,79 @@ public void commit() throws TransactionException {
         sync(commitAsync());
     }
 
+    /**
+     * Discards the directly mapped transaction fragments in case of 
coordinator side transaction invalidation
+     * (either kill or implicit rollback due to mapping failure, see 
postEnlist).
+     *
+     * @param killed Killed flag.
+     *
+     * @return The future.
+     */
+    public CompletableFuture<Void> discardDirectMappings(boolean killed) {
+        enlistPartitionLock.writeLock().lock();
+
+        try {
+            if (!finishFut.compareAndSet(null, new CompletableFuture<>())) {
+                return finishFut.get();
+            }
+        } finally {
+            enlistPartitionLock.writeLock().unlock();
+        }
+
+        return sendDiscardRequests().handle((r, e) -> {
+            setState(killed ? STATE_KILLED : STATE_ROLLED_BACK);
+            ch.inflights().erase(txId());
+            this.finishFut.get().complete(null);
+            return null;
+        });
+    }
+
+    private CompletableFuture<Void> sendDiscardRequests() {
+        assert finishFut != null;
+
+        if 
(!ch.protocolContext().isFeatureSupported(TX_DIRECT_MAPPING_SEND_DISCARD)) {
+            return nullCompletedFuture();
+        }
+
+        Map<String, List<TablePartitionId>> enlistments = new HashMap<>();
+
+        for (Entry<TablePartitionId, CompletableFuture<IgniteBiTuple<String, 
Long>>> entry : enlisted.entrySet()) {
+            IgniteBiTuple<String, Long> info = entry.getValue().getNow(null);
+
+            if (info == null) {
+                continue; // Ignore incomplete enlistments.
+            }
+
+            enlistments.computeIfAbsent(info.get1(), k -> new 
ArrayList<>()).add(entry.getKey());
+        }
+
+        List<CompletableFuture<Void>> futures = new 
ArrayList<>(enlistments.size());
+
+        for (Entry<String, List<TablePartitionId>> entry : 
enlistments.entrySet()) {
+            CompletableFuture<Void> discardFut = 
reliableChannel.getNodeChannelAsync(entry.getKey()).thenCompose(ch -> {

Review Comment:
   Actually, re-routing logic is not necessary in this scenario and only 
complicates server-side implementation. 
   If a client connection is lost, directly mapped tx resource will be cleaned 
up by other measures.
   
   I've changed the code so it uses only an existing connection.
   
   However, I don't understand why it's forbidden to re-establish a new 
connection here. It's perfectly fine to me and can help address temporary 
network issues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to