vldpyatkov commented on code in PR #7052:
URL: https://github.com/apache/ignite-3/pull/7052#discussion_r2577950226


##########
modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java:
##########
@@ -1061,7 +1061,37 @@ private NetworkMessage 
prepareReplicaErrorResponse(boolean sendTimestamp, Throwa
             return REPLICA_MESSAGES_FACTORY
                     .errorTimestampAwareReplicaResponse()
                     .throwable(ex)
-                    .timestamp(clockService.now())
+                    .timestamp(clockService.current())
+                    .build();
+        } else {
+            return REPLICA_MESSAGES_FACTORY
+                    .errorReplicaResponse()
+                    .throwable(ex)
+                    .build();
+        }
+    }
+
+    private NetworkMessage prepareDelayedReplicaResponse(boolean 
sendTimestamp, Object result) {
+        if (sendTimestamp) {
+            return REPLICA_MESSAGES_FACTORY
+                    .timestampAwareReplicaResponse()
+                    .result(result)
+                    .timestamp(clockService.current())
+                    .build();
+        } else {
+            return REPLICA_MESSAGES_FACTORY
+                    .replicaResponse()
+                    .result(result)
+                    .build();
+        }
+    }
+
+    private NetworkMessage prepareDelayedReplicaErrorResponse(boolean 
sendTimestamp, Throwable ex) {

Review Comment:
   Maybe we can use this method `prepareReplicaErrorResponse` because there is 
not any difference?



##########
modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTableMapUtils.java:
##########
@@ -126,7 +127,10 @@ private static <R, C> boolean unlockOnRetry(
             }
             Transaction tx0 = txns.get(i);
             tx0.rollbackAsync().exceptionally(e -> {
-                log.error("Failed to rollback a transactional batch: [tx=" + 
tx0 + ']', e);
+                Throwable cause = unwrapCause(e);
+                if (!(cause instanceof IgniteClientConnectionException)) {

Review Comment:
   What happens when we get other exception? We don't even log it.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java:
##########
@@ -251,6 +285,7 @@ ReadWriteTxContext lockTxForNewUpdates(UUID txId, 
Map<ReplicationGroupId, Pendin
 
     abstract static class TxContext {
         volatile long inflights = 0; // Updated under lock.
+        Throwable err;

Review Comment:
   I don't think this is necessary because a modification happens only in the 
critical section (in the compute closure of the concurrent hash map).



##########
modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java:
##########
@@ -221,13 +221,27 @@ public CompletableFuture<Void> commitAsync() {
         boolean enabled = 
ch.protocolContext().isFeatureSupported(TX_PIGGYBACK);
         CompletableFuture<Void> finishFut = enabled ? 
ch.inflights().finishFuture(txId()) : nullCompletedFuture();
 
-        CompletableFuture<Void> mainFinishFut = finishFut.thenCompose(ignored 
-> ch.serviceAsync(ClientOp.TX_COMMIT, w -> {
-            w.out().packLong(id);
+        CompletableFuture<Void> mainFinishFut = finishFut.handle((ignored, e) 
-> {
+            if (e != null) {
+                ch.serviceAsync(ClientOp.TX_ROLLBACK, w -> {
+                    w.out().packLong(id);
 
-            if (!isReadOnly && enabled) {
-                packEnlisted(w);
+                    if (!isReadOnly && enabled) {
+                        packEnlisted(w);
+                    }
+                }, r -> null);
+
+                return CompletableFuture.<Void>failedFuture(e);
             }
-        }, r -> null));
+
+            return ch.serviceAsync(ClientOp.TX_COMMIT, w -> {
+                w.out().packLong(id);
+
+                if (!isReadOnly && enabled) {
+                    packEnlisted(w);
+                }
+            }, r -> (Void) null);
+        }).thenCompose(x -> x);

Review Comment:
   We mostly use `Function.identity()` in a case like this.



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java:
##########
@@ -170,7 +170,25 @@ public void removeInflight(UUID txId) {
 
         // Avoid completion under lock.
         if (tuple != null) {
-            tuple.onInflightsRemoved();
+            tuple.onInflightsRemoved(tuple.err);
+        }
+    }
+
+    void removeInflight(UUID txId, Throwable cause) {
+        // Can be null if tx was aborted and inflights were removed from the 
collection.
+        TxContext tuple = txCtxMap.computeIfPresent(txId, (uuid, ctx) -> {
+            ctx.removeInflight(txId);
+
+            if (cause != null && ctx.err == null) {
+                ctx.err = cause; // Retain only first exception.

Review Comment:
   By the way, we should get an exception from the tuple:
   `tuple.onInflightsRemoved(tuple.err);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to