denis-chudov commented on code in PR #3496:
URL: https://github.com/apache/ignite-3/pull/3496#discussion_r1547447458


##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java:
##########
@@ -314,6 +319,58 @@ private void markTxAbortedInTxStateStorage(IgniteImpl 
primaryNode, InternalTrans
         storage.put(tx.id(), txMetaToSet);
     }
 
+
+    @Test
+    void testCleanupReplicatedMessage() throws ExecutionException, 
InterruptedException {
+        Context context = prepareTransactionData();
+

Review Comment:
   empty line



##########
modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDurableFinishTest.java:
##########
@@ -314,6 +319,58 @@ private void markTxAbortedInTxStateStorage(IgniteImpl 
primaryNode, InternalTrans
         storage.put(tx.id(), txMetaToSet);
     }
 
+
+    @Test
+    void testCleanupReplicatedMessage() throws ExecutionException, 
InterruptedException {
+        Context context = prepareTransactionData();
+
+
+        DefaultMessagingService primaryMessaging = 
messaging(context.primaryNode);
+
+        CompletableFuture<Void> cleanupReplicatedFuture = new 
CompletableFuture<>();
+
+        primaryMessaging.dropMessages((s, networkMessage) -> {
+            if (networkMessage instanceof TxCleanupMessageResponse) {
+                logger().info("Received message: {}.", networkMessage);
+
+                TxCleanupMessageResponse message = (TxCleanupMessageResponse) 
networkMessage;
+
+                if (message instanceof TxCleanupMessageErrorResponse) {
+                    TxCleanupMessageErrorResponse error = 
(TxCleanupMessageErrorResponse) message;
+
+                    logger().error("Cleanup Error: ", error);
+
+                    return false;
+                }
+
+                Object result = message.result();
+
+                if (result != null) {
+                    cleanupReplicatedFuture.complete(null);
+                }
+            }
+
+            return false;
+        });
+
+        commitAndValidate(context.tx, context.tbl, context.keyTpl);
+
+        for (CompletableFuture<?> future : futures) {

Review Comment:
   do we actually add something to this `futures` list?



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java:
##########
@@ -43,6 +43,8 @@ public class TxStateMeta implements TransactionMeta {
 
     private final Long initialVacuumObservationTimestamp;
 
+    private final Long cleanupCompletionTimestamp;

Review Comment:
   Seems it should be added to equals/hashcode



##########
modules/transactions/src/main/java/org/apache/ignite/internal/tx/message/TxCleanupMessageResponse.java:
##########
@@ -17,12 +17,18 @@
 
 package org.apache.ignite.internal.tx.message;
 
+import org.apache.ignite.internal.network.annotations.Marshallable;
 import org.apache.ignite.internal.network.annotations.Transferable;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Cleanup transaction message response.
  */
 @Transferable(TxMessageGroup.TX_CLEANUP_MSG_RESPONSE)
 public interface TxCleanupMessageResponse extends TimestampAware {
+
+    @Nullable
+    @Marshallable
+    Object result();

Review Comment:
   I see only a usage with `CleanupReplicatedInfo`, why `Object` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to