This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 1539873b49 IGNITE-21757 Fix redeploy (#3528)
1539873b49 is described below

commit 1539873b49f041b4f39844d512012685d09d33ea
Author: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com>
AuthorDate: Fri Apr 5 10:14:19 2024 +0300

    IGNITE-21757 Fix redeploy (#3528)
    
    Now the UUID is used as an operation id that fixes ABA problem.
---
 .../ignite/internal/deployunit/UnitStatus.java     | 26 +++-----
 .../metastore/ClusterEventCallbackImpl.java        | 19 +++---
 .../metastore/DeploymentUnitFailover.java          | 14 +++--
 .../deployunit/metastore/DeploymentUnitStore.java  |  9 ++-
 .../metastore/DeploymentUnitStoreImpl.java         | 33 +++++++---
 .../metastore/status/SerializeUtils.java           | 34 +++++++++-
 .../metastore/status/UnitClusterStatus.java        | 24 +++++---
 .../metastore/status/UnitNodeStatus.java           | 35 +++++------
 .../deployment/UnitStatusesSerializerTest.java     | 21 ++++---
 .../metastore/DeploymentUnitStoreImplTest.java     | 72 ++++++++++++++++++----
 .../deployunit/DeploymentManagerImplTest.java      | 15 +++--
 11 files changed, 202 insertions(+), 100 deletions(-)

diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java
index 169fb174bb..0a85c60e7b 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.deployunit;
 
+import java.util.Objects;
+import java.util.UUID;
 import org.apache.ignite.compute.version.Version;
 
 /**
@@ -38,7 +40,7 @@ public abstract class UnitStatus {
      */
     private DeploymentStatus status;
 
-    private final long opId;
+    private final UUID opId;
 
     /**
      * Constructor.
@@ -48,7 +50,7 @@ public abstract class UnitStatus {
      * @param status Unit status.
      * @param opId Deployment unit operation identifier.
      */
-    public UnitStatus(String id, Version version, DeploymentStatus status, 
long opId) {
+    public UnitStatus(String id, Version version, DeploymentStatus status, 
UUID opId) {
         this.id = id;
         this.version = version;
         this.status = status;
@@ -96,7 +98,7 @@ public abstract class UnitStatus {
      *
      * @return Operation identifier of deployment unit creation.
      */
-    public long opId() {
+    public UUID opId() {
         return opId;
     }
 
@@ -108,24 +110,14 @@ public abstract class UnitStatus {
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-
-        UnitStatus meta = (UnitStatus) o;
-
-        if (id != null ? !id.equals(meta.id) : meta.id != null) {
-            return false;
-        }
-        if (version != null ? !version.equals(meta.version) : meta.version != 
null) {
-            return false;
-        }
-        return status == meta.status;
+        UnitStatus that = (UnitStatus) o;
+        return Objects.equals(id, that.id) && Objects.equals(version, 
that.version) && status == that.status
+                && Objects.equals(opId, that.opId);
     }
 
     @Override
     public int hashCode() {
-        int result = id != null ? id.hashCode() : 0;
-        result = 31 * result + (version != null ? version.hashCode() : 0);
-        result = 31 * result + (status != null ? status.hashCode() : 0);
-        return result;
+        return Objects.hash(id, version, status, opId);
     }
 
     @Override
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java
index b8130502e8..189f2ebef4 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java
@@ -20,12 +20,14 @@ package org.apache.ignite.internal.deployunit.metastore;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
 
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.ignite.compute.version.Version;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import org.apache.ignite.internal.deployunit.FileDeployerService;
 import 
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
+import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
 
 /** Listener of deployment unit cluster status changes. */
 public class ClusterEventCallbackImpl extends ClusterEventCallback {
@@ -64,35 +66,36 @@ public class ClusterEventCallbackImpl extends 
ClusterEventCallback {
         // Now the deployment unit can be removed from each target node and, 
after it, remove corresponding status records.
         deploymentUnitStore.getNodeStatus(nodeName, id, 
version).thenAccept(nodeStatus -> {
             if (nodeStatus != null && nodeStatus.status() == REMOVING) {
-                undeploy(id, version);
+                undeploy(id, version, nodeStatus.opId());
             }
         });
     }
 
-    private void undeploy(String id, Version version) {
+    private void undeploy(String id, Version version, UUID opId) {
         deployerService.undeploy(id, version).thenAccept(success -> {
             if (success) {
-                deploymentUnitStore.removeNodeStatus(nodeName, id, 
version).thenAccept(successRemove -> {
+                deploymentUnitStore.removeNodeStatus(nodeName, id, version, 
opId).thenAccept(successRemove -> {
                     if (successRemove) {
-                        removeClusterStatus(id, version);
+                        removeClusterStatus(id, version, opId);
                     }
                 });
             }
         });
     }
 
-    private void removeClusterStatus(String id, Version version) {
+    private void removeClusterStatus(String id, Version version, UUID opId) {
         cmgManager.logicalTopology().thenAccept(logicalTopology -> {
             Set<String> logicalNodes = logicalTopology.nodes().stream()
                     .map(LogicalNode::name)
                     .collect(Collectors.toSet());
-            deploymentUnitStore.getAllNodes(id, version).thenAccept(nodes -> {
-                boolean emptyTopology = nodes.stream()
+            deploymentUnitStore.getAllNodeStatuses(id, 
version).thenAccept(statuses -> {
+                boolean emptyTopology = statuses.stream()
+                        .map(UnitNodeStatus::nodeId)
                         .filter(logicalNodes::contains)
                         .findAny()
                         .isEmpty();
                 if (emptyTopology) {
-                    deploymentUnitStore.removeClusterStatus(id, version);
+                    deploymentUnitStore.removeClusterStatus(id, version, opId);
                 }
             });
         });
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
index 5f915341c2..ee07fb5edb 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java
@@ -23,6 +23,7 @@ import static 
org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
 
 import java.util.Objects;
+import java.util.UUID;
 import org.apache.ignite.compute.version.Version;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
@@ -92,7 +93,7 @@ public class DeploymentUnitFailover {
         Version version = unitNodeStatus.version();
 
         if (unitClusterStatus == null) {
-            undeploy(id, version);
+            undeploy(id, version, unitNodeStatus.opId());
             return;
         }
 
@@ -126,11 +127,11 @@ public class DeploymentUnitFailover {
         }
     }
 
-    private void undeploy(String id, Version version) {
+    private void undeploy(String id, Version version, UUID opId) {
         deployer.undeploy(id, version)
                 .thenAccept(success -> {
                     if (success) {
-                        deploymentUnitStore.removeNodeStatus(nodeName, id, 
version);
+                        deploymentUnitStore.removeNodeStatus(nodeName, id, 
version, opId);
                     }
                 });
     }
@@ -138,11 +139,12 @@ public class DeploymentUnitFailover {
     private boolean checkAbaProblem(UnitClusterStatus clusterStatus, 
UnitNodeStatus nodeStatus) {
         String id = nodeStatus.id();
         Version version = nodeStatus.version();
-        if (clusterStatus.opId() != nodeStatus.opId()) {
+        UUID opId = nodeStatus.opId();
+        if (!Objects.equals(clusterStatus.opId(), opId)) {
             if (nodeStatus.status() == DEPLOYED) {
-                undeploy(id, version);
+                undeploy(id, version, opId);
             } else {
-                deploymentUnitStore.removeNodeStatus(nodeName, id, version);
+                deploymentUnitStore.removeNodeStatus(nodeName, id, version, 
opId);
             }
             return true;
         }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
index 74e87beb00..13fd19f2ae 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.deployunit.metastore;
 
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.version.Version;
 import org.apache.ignite.internal.deployunit.DeploymentStatus;
@@ -133,7 +134,7 @@ public interface DeploymentUnitStore {
             String nodeId,
             String id,
             Version version,
-            long opId,
+            UUID opId,
             DeploymentStatus status);
 
     /**
@@ -179,16 +180,18 @@ public interface DeploymentUnitStore {
      *
      * @param id Deployment unit identifier.
      * @param version Deployment version identifier.
+     * @param opId Operation identifier.
      * @return Future with {@code true} result if removed successfully.
      */
-    CompletableFuture<Boolean> removeClusterStatus(String id, Version version);
+    CompletableFuture<Boolean> removeClusterStatus(String id, Version version, 
UUID opId);
 
     /**
      * Removes node status.
      *
      * @param id Deployment unit identifier.
      * @param version Deployment version identifier.
+     * @param opId Operation identifier.
      * @return Future with {@code true} result if removed successfully.
      */
-    CompletableFuture<Boolean> removeNodeStatus(String nodeId, String id, 
Version version);
+    CompletableFuture<Boolean> removeNodeStatus(String nodeId, String id, 
Version version, UUID opId);
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java
index e454b37908..fb15c86285 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java
@@ -25,11 +25,13 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
 import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
+import static org.apache.ignite.internal.metastorage.dsl.Operations.remove;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -43,7 +45,6 @@ import 
org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus;
 import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.metastorage.dsl.Operations;
 
 /**
  * Implementation of {@link DeploymentUnitStore} based on {@link 
MetaStorageManager}.
@@ -142,8 +143,8 @@ public class DeploymentUnitStoreImpl implements 
DeploymentUnitStore {
     @Override
     public CompletableFuture<UnitClusterStatus> createClusterStatus(String id, 
Version version, Set<String> nodes) {
         ByteArray key = 
ClusterStatusKey.builder().id(id).version(version).build().toByteArray();
-        long revision = metaStorage.appliedRevision();
-        UnitClusterStatus clusterStatus = new UnitClusterStatus(id, version, 
UPLOADING, revision, nodes);
+        UUID operationId = UUID.randomUUID();
+        UnitClusterStatus clusterStatus = new UnitClusterStatus(id, version, 
UPLOADING, operationId, nodes);
         byte[] value = UnitClusterStatus.serialize(clusterStatus);
 
         return metaStorage.invoke(notExists(key), put(key, value), noop())
@@ -155,7 +156,7 @@ public class DeploymentUnitStoreImpl implements 
DeploymentUnitStore {
             String nodeId,
             String id,
             Version version,
-            long opId,
+            UUID opId,
             DeploymentStatus status
     ) {
         ByteArray key = 
NodeStatusKey.builder().id(id).version(version).nodeId(nodeId).build().toByteArray();
@@ -208,17 +209,31 @@ public class DeploymentUnitStoreImpl implements 
DeploymentUnitStore {
     }
 
     @Override
-    public CompletableFuture<Boolean> removeClusterStatus(String id, Version 
version) {
+    public CompletableFuture<Boolean> removeClusterStatus(String id, Version 
version, UUID opId) {
         ByteArray key = 
ClusterStatusKey.builder().id(id).version(version).build().toByteArray();
 
-        return metaStorage.invoke(exists(key), Operations.remove(key), noop());
+        return metaStorage.get(key).thenCompose(e -> {
+            UnitClusterStatus prev = UnitClusterStatus.deserialize(e.value());
+            if (!Objects.equals(prev.opId(), opId)) {
+                return falseCompletedFuture();
+            }
+
+            return metaStorage.invoke(revision(key).eq(e.revision()), 
remove(key), noop());
+        });
     }
 
     @Override
-    public CompletableFuture<Boolean> removeNodeStatus(String nodeId, String 
id, Version version) {
+    public CompletableFuture<Boolean> removeNodeStatus(String nodeId, String 
id, Version version, UUID opId) {
         ByteArray key = 
NodeStatusKey.builder().id(id).version(version).nodeId(nodeId).build().toByteArray();
 
-        return metaStorage.invoke(exists(key), Operations.remove(key), noop());
+        return metaStorage.get(key).thenCompose(e -> {
+            UnitNodeStatus prev = UnitNodeStatus.deserialize(e.value());
+            if (!Objects.equals(prev.opId(), opId)) {
+                return falseCompletedFuture();
+            }
+
+            return metaStorage.invoke(revision(key).eq(e.revision()), 
remove(key), noop());
+        });
     }
 
     /**
@@ -244,7 +259,7 @@ public class DeploymentUnitStoreImpl implements 
DeploymentUnitStore {
                     }
 
                     return metaStorage.invoke(
-                                    force ? exists(key) : 
revision(key).le(e.revision()),
+                                    force ? exists(key) : 
revision(key).eq(e.revision()),
                                     put(key, newValue),
                                     noop())
                             .thenCompose(finished -> {
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/SerializeUtils.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/SerializeUtils.java
index 120fc60edd..6068ed80eb 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/SerializeUtils.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/SerializeUtils.java
@@ -24,8 +24,13 @@ import java.util.Base64;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
+import org.apache.ignite.compute.version.Version;
+import org.apache.ignite.compute.version.VersionParseException;
+import org.apache.ignite.internal.deployunit.DeploymentStatus;
 import org.apache.ignite.internal.deployunit.UnitStatus;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Serializer for {@link UnitStatus}.
@@ -106,7 +111,34 @@ public final class SerializeUtils {
         return new String(Base64.getDecoder().decode(s), UTF_8);
     }
 
-    public static boolean checkElement(String[] arr, int index) {
+    static boolean checkElement(String[] arr, int index) {
         return arr.length > index && arr[index] != null && 
!arr[index].isBlank();
     }
+
+    @Nullable
+    static Version deserializeVersion(String[] values, int index) {
+        try {
+            return checkElement(values, index) ? 
Version.parseVersion(decode(values[index])) : null;
+        } catch (VersionParseException e) {
+            return null;
+        }
+    }
+
+    @Nullable
+    static DeploymentStatus deserializeStatus(String[] values, int index) {
+        try {
+            return checkElement(values, index) ? 
DeploymentStatus.valueOf(decode(values[index])) : null;
+        } catch (IllegalArgumentException e) {
+            return null;
+        }
+    }
+
+    @Nullable
+    static UUID deserializeUuid(String[] values, int index) {
+        try {
+            return checkElement(values, index) ? 
UUID.fromString(decode(values[index])) : null;
+        } catch (IllegalArgumentException e) {
+            return null;
+        }
+    }
 }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java
index f1b7b2728d..63939b7f1a 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java
@@ -18,12 +18,19 @@
 package org.apache.ignite.internal.deployunit.metastore.status;
 
 import static 
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.checkElement;
+import static 
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.decode;
+import static 
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.decodeAsSet;
+import static 
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeStatus;
+import static 
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeUuid;
+import static 
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeVersion;
 
 import java.util.Collections;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.compute.version.Version;
 import org.apache.ignite.internal.deployunit.DeploymentStatus;
 import org.apache.ignite.internal.deployunit.UnitStatus;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Deployment unit cluster status.
@@ -44,7 +51,7 @@ public class UnitClusterStatus extends UnitStatus {
             String id,
             Version version,
             DeploymentStatus status,
-            long opId,
+            UUID opId,
             Set<String> initialNodesToDeploy
     ) {
         super(id, version, status, opId);
@@ -102,19 +109,18 @@ public class UnitClusterStatus extends UnitStatus {
      * @param value Serialized deployment unit cluster status.
      * @return Deserialized deployment unit cluster status.
      */
-    public static UnitClusterStatus deserialize(byte[] value) {
+    public static UnitClusterStatus deserialize(byte @Nullable [] value) {
         if (value == null || value.length == 0) {
-            return new UnitClusterStatus(null, null, null, 0, Set.of());
+            return new UnitClusterStatus(null, null, null, null, Set.of());
         }
 
         String[] values = SerializeUtils.deserialize(value);
 
-        String id = checkElement(values, 0) ? SerializeUtils.decode(values[0]) 
: null;
-        Version version = checkElement(values, 1) ? 
Version.parseVersion(SerializeUtils.decode(values[1])) : null;
-        DeploymentStatus status = checkElement(values, 2) ? 
DeploymentStatus.valueOf(SerializeUtils.decode(values[2])) : null;
-        long opId = checkElement(values, 3) ? 
Long.parseLong(SerializeUtils.decode(values[3])) : 0;
-        Set<String> nodes = checkElement(values, 4) ? 
SerializeUtils.decodeAsSet(values[4]) : Set.of();
-
+        String id = checkElement(values, 0) ? decode(values[0]) : null;
+        Version version = deserializeVersion(values, 1);
+        DeploymentStatus status = deserializeStatus(values, 2);
+        UUID opId = deserializeUuid(values, 3);
+        Set<String> nodes = checkElement(values, 4) ? decodeAsSet(values[4]) : 
Set.of();
 
         return new UnitClusterStatus(id, version, status, opId, nodes);
     }
diff --git 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java
 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java
index 92dee426ab..0c32b788e3 100644
--- 
a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java
+++ 
b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java
@@ -18,11 +18,16 @@
 package org.apache.ignite.internal.deployunit.metastore.status;
 
 import static 
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.checkElement;
+import static 
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.decode;
+import static 
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeStatus;
+import static 
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeUuid;
+import static 
org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeVersion;
 
+import java.util.UUID;
 import org.apache.ignite.compute.version.Version;
-import org.apache.ignite.compute.version.VersionParseException;
 import org.apache.ignite.internal.deployunit.DeploymentStatus;
 import org.apache.ignite.internal.deployunit.UnitStatus;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Deployment unit node status.
@@ -39,7 +44,7 @@ public class UnitNodeStatus extends UnitStatus {
      * @param opId Deployment unit operation identifier.
      * @param nodeId Node consistent id.
      */
-    public UnitNodeStatus(String id, Version version, DeploymentStatus status, 
long opId, String nodeId) {
+    public UnitNodeStatus(String id, Version version, DeploymentStatus status, 
UUID opId, String nodeId) {
         super(id, version, status, opId);
         this.nodeId = nodeId;
     }
@@ -94,30 +99,18 @@ public class UnitNodeStatus extends UnitStatus {
      * @param value Serialized deployment unit node status.
      * @return Deserialized deployment unit node status.
      */
-    public static UnitNodeStatus deserialize(byte[] value) {
+    public static UnitNodeStatus deserialize(byte @Nullable [] value) {
         if (value == null || value.length == 0) {
-            return new UnitNodeStatus(null, null, null, 0, null);
+            return new UnitNodeStatus(null, null, null, null, null);
         }
 
         String[] values = SerializeUtils.deserialize(value);
 
-        String id = checkElement(values, 0) ? SerializeUtils.decode(values[0]) 
: null;
-        Version version;
-        try {
-            version = checkElement(values, 1) ? 
Version.parseVersion(SerializeUtils.decode(values[1])) : null;
-        } catch (VersionParseException e) {
-            version = null;
-        }
-        DeploymentStatus status;
-        try {
-            status = checkElement(values, 2) ? 
DeploymentStatus.valueOf(SerializeUtils.decode(values[2])) : null;
-        } catch (IllegalArgumentException e) {
-            status = null;
-        }
-
-        long opId = checkElement(values, 3) ? 
Long.parseLong(SerializeUtils.decode(values[3])) : 0;
-
-        String nodeId = checkElement(values, 4) ? 
SerializeUtils.decode(values[4]) : null;
+        String id = checkElement(values, 0) ? decode(values[0]) : null;
+        Version version = deserializeVersion(values, 1);
+        DeploymentStatus status = deserializeStatus(values, 2);
+        UUID opId = deserializeUuid(values, 3);
+        String nodeId = checkElement(values, 4) ? decode(values[4]) : null;
 
         return new UnitNodeStatus(id, version, status, opId, nodeId);
     }
diff --git 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitStatusesSerializerTest.java
 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitStatusesSerializerTest.java
index 27a4b38a82..5b4609ac53 100644
--- 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitStatusesSerializerTest.java
+++ 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitStatusesSerializerTest.java
@@ -24,6 +24,7 @@ import static 
org.junit.jupiter.params.provider.Arguments.arguments;
 
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.compute.version.Version;
 import org.apache.ignite.internal.deployunit.DeploymentStatus;
 import org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils;
@@ -39,19 +40,19 @@ import org.junit.jupiter.params.provider.MethodSource;
 public class UnitStatusesSerializerTest {
     private static List<Arguments> nodeStatusProvider() {
         return List.of(
-                arguments(null, null, null, 0, null),
-                arguments("id", null, null, 0, null),
-                arguments("id", Version.LATEST, null, 0, null),
-                arguments("id", Version.LATEST, UPLOADING, 10, "node1")
+                arguments(null, null, null, null, null),
+                arguments("id", null, null, UUID.randomUUID(), null),
+                arguments("id", Version.LATEST, null, UUID.randomUUID(), null),
+                arguments("id", Version.LATEST, UPLOADING, UUID.randomUUID(), 
"node1")
         );
     }
 
     private static List<Arguments> clusterStatusProvider() {
         return List.of(
-                arguments("id", Version.LATEST, UPLOADING, 0, Set.of()),
-                arguments("id", Version.LATEST, UPLOADING, 1, Set.of("node1")),
-                arguments("id", Version.LATEST, UPLOADING, 1, Set.of("node1", 
"node2")),
-                arguments("id", Version.LATEST, UPLOADING, 1, Set.of("node1", 
"node2", "node3"))
+                arguments("id", Version.LATEST, UPLOADING, null, Set.of()),
+                arguments("id", Version.LATEST, UPLOADING, UUID.randomUUID(), 
Set.of("node1")),
+                arguments("id", Version.LATEST, UPLOADING, UUID.randomUUID(), 
Set.of("node1", "node2")),
+                arguments("id", Version.LATEST, UPLOADING, UUID.randomUUID(), 
Set.of("node1", "node2", "node3"))
         );
     }
 
@@ -61,7 +62,7 @@ public class UnitStatusesSerializerTest {
             String id,
             Version version,
             DeploymentStatus status,
-            long opId,
+            UUID opId,
             String nodeId
     ) {
         UnitNodeStatus nodeStatus = new UnitNodeStatus(id, version, status, 
opId, nodeId);
@@ -77,7 +78,7 @@ public class UnitStatusesSerializerTest {
             String id,
             Version version,
             DeploymentStatus status,
-            long opId,
+            UUID opId,
             Set<String> consistentIdLocation
     ) {
         UnitClusterStatus nodeStatus = new UnitClusterStatus(id, version, 
status, opId, consistentIdLocation);
diff --git 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
index 22c43fc572..8ad55e40bc 100644
--- 
a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
+++ 
b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.version.Version;
 import org.apache.ignite.internal.deployunit.metastore.ClusterEventCallback;
@@ -129,7 +130,7 @@ public class DeploymentUnitStoreImplTest extends 
BaseIgniteAbstractTest {
         assertThat(clusterStatusFuture, willCompleteSuccessfully());
 
         UnitClusterStatus clusterStatus = clusterStatusFuture.get();
-        long opId = clusterStatus.opId();
+        UUID opId = clusterStatus.opId();
 
         assertThat(metastore.getClusterStatus(id, version),
                 willBe(new UnitClusterStatus(id, version, UPLOADING, opId, 
Set.of())));
@@ -138,11 +139,59 @@ public class DeploymentUnitStoreImplTest extends 
BaseIgniteAbstractTest {
         assertThat(metastore.getClusterStatus(id, version),
                 willBe(new UnitClusterStatus(id, version, DEPLOYED, opId, 
Set.of())));
 
-        assertThat(metastore.removeClusterStatus(id, version), willBe(true));
+        assertThat(metastore.removeClusterStatus(id, version, opId), 
willBe(true));
 
         assertThat(metastore.getClusterStatus(id, version), 
willBe(nullValue()));
     }
 
+    @Test
+    void clusterStatusAba() {
+        String id = "id1";
+        Version version = Version.parseVersion("1.1.1");
+
+        CompletableFuture<UnitClusterStatus> clusterStatusFuture1 = 
metastore.createClusterStatus(id, version, Set.of());
+        assertThat(clusterStatusFuture1, willCompleteSuccessfully());
+
+        UUID opId1 = clusterStatusFuture1.join().opId();
+
+        assertThat(metastore.removeClusterStatus(id, version, opId1), 
willBe(true));
+
+        // Create new cluster status with the same id and version
+        CompletableFuture<UnitClusterStatus> clusterStatusFuture2 = 
metastore.createClusterStatus(id, version, Set.of());
+        assertThat(clusterStatusFuture2, willCompleteSuccessfully());
+
+        UUID opId2 = clusterStatusFuture2.join().opId();
+
+        // Remove with the initial operation ID should fail
+        assertThat(metastore.removeClusterStatus(id, version, opId1), 
willBe(false));
+
+        // Remove with the correct operation ID should succeed
+        assertThat(metastore.removeClusterStatus(id, version, opId2), 
willBe(true));
+    }
+
+    @Test
+    void nodeStatusAba() {
+        String id = "id1";
+        Version version = Version.parseVersion("1.1.1");
+        String node1 = "node1";
+
+        UUID opId1 = UUID.randomUUID();
+        UUID opId2 = UUID.randomUUID();
+
+        assertThat(metastore.createNodeStatus(node1, id, version, opId1, 
UPLOADING), willBe(true));
+
+        assertThat(metastore.removeNodeStatus(node1, id, version, opId1), 
willBe(true));
+
+        // Create new node status with the same id and version
+        assertThat(metastore.createNodeStatus(node1, id, version, opId2, 
UPLOADING), willBe(true));
+
+        // Remove with the initial operation ID should fail
+        assertThat(metastore.removeNodeStatus(node1, id, version, opId1), 
willBe(false));
+
+        // Remove with the correct operation ID should succeed
+        assertThat(metastore.removeNodeStatus(node1, id, version, opId2), 
willBe(true));
+    }
+
     @Test
     public void nodeStatusTest() throws Exception {
         String id = "id2";
@@ -156,7 +205,7 @@ public class DeploymentUnitStoreImplTest extends 
BaseIgniteAbstractTest {
         assertThat(clusterStatusFuture, willCompleteSuccessfully());
 
         UnitClusterStatus clusterStatus = clusterStatusFuture.get();
-        long opId = clusterStatus.opId();
+        UUID opId = clusterStatus.opId();
 
         assertThat(metastore.getClusterStatus(id, version),
                 willBe(new UnitClusterStatus(id, version, UPLOADING, opId, 
Set.of(node1, node2, node3))));
@@ -183,11 +232,11 @@ public class DeploymentUnitStoreImplTest extends 
BaseIgniteAbstractTest {
                 willBe(contains((new UnitClusterStatus(id, version, DEPLOYED, 
opId, Set.of(node1, node2, node3)))))
         );
 
-        assertThat(metastore.removeClusterStatus(id, version), willBe(true));
+        assertThat(metastore.removeClusterStatus(id, version, opId), 
willBe(true));
         assertThat(metastore.getNodeStatus(node1, id, version),
                 willBe(new UnitNodeStatus(id, version, DEPLOYED, opId, 
node1)));
 
-        assertThat(metastore.removeNodeStatus(node1, id, version), 
willBe(true));
+        assertThat(metastore.removeNodeStatus(node1, id, version, opId), 
willBe(true));
         assertThat(metastore.getNodeStatus(node1, id, version), 
willBe(nullValue()));
     }
 
@@ -197,17 +246,18 @@ public class DeploymentUnitStoreImplTest extends 
BaseIgniteAbstractTest {
         Version version = Version.parseVersion("1.1.1");
         String node1 = LOCAL_NODE;
 
-        assertThat(metastore.createNodeStatus(node1, id, version, 0, 
UPLOADING), willBe(true));
+        UUID opId = UUID.randomUUID();
+        assertThat(metastore.createNodeStatus(node1, id, version, opId, 
UPLOADING), willBe(true));
         assertThat(metastore.updateNodeStatus(node1, id, version, DEPLOYED), 
willBe(true));
         assertThat(metastore.updateNodeStatus(node1, id, version, OBSOLETE), 
willBe(true));
         assertThat(metastore.updateNodeStatus(node1, id, version, REMOVING), 
willBe(true));
 
         await().untilAsserted(() ->
                 assertThat(nodeHistory, containsInAnyOrder(
-                        new UnitNodeStatus(id, version, UPLOADING, 0, node1),
-                        new UnitNodeStatus(id, version, DEPLOYED, 0, node1),
-                        new UnitNodeStatus(id, version, OBSOLETE, 0, node1),
-                        new UnitNodeStatus(id, version, REMOVING, 0, node1)
+                        new UnitNodeStatus(id, version, UPLOADING, opId, 
node1),
+                        new UnitNodeStatus(id, version, DEPLOYED, opId, node1),
+                        new UnitNodeStatus(id, version, OBSOLETE, opId, node1),
+                        new UnitNodeStatus(id, version, REMOVING, opId, node1)
                 )));
     }
 
@@ -220,7 +270,7 @@ public class DeploymentUnitStoreImplTest extends 
BaseIgniteAbstractTest {
         assertThat(clusterStatusFuture, willCompleteSuccessfully());
 
         UnitClusterStatus clusterStatus = clusterStatusFuture.get();
-        long opId = clusterStatus.opId();
+        UUID opId = clusterStatus.opId();
 
         assertThat(metastore.updateClusterStatus(id, version, DEPLOYED), 
willBe(true));
         assertThat(metastore.updateClusterStatus(id, version, OBSOLETE), 
willBe(true));
diff --git 
a/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentManagerImplTest.java
 
b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentManagerImplTest.java
index a763fa5ed3..28d1da95a9 100644
--- 
a/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentManagerImplTest.java
+++ 
b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentManagerImplTest.java
@@ -18,6 +18,10 @@
 package org.apache.ignite.internal.deployunit;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING;
+import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
@@ -26,6 +30,7 @@ import static org.mockito.Mockito.doReturn;
 import java.nio.file.Path;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import org.apache.ignite.compute.version.Version;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -66,11 +71,11 @@ class DeploymentManagerImplTest extends 
BaseIgniteAbstractTest {
     @Test
     public void detectLatestDeployedVersion() {
         List<UnitClusterStatus> unitClusterStatuses = List.of(
-                new UnitClusterStatus("unit", Version.parseVersion("1.0.0"), 
DeploymentStatus.DEPLOYED, 0, Set.of("node1", "node2")),
-                new UnitClusterStatus("unit", Version.parseVersion("1.0.1"), 
DeploymentStatus.OBSOLETE, 0, Set.of("node1", "node2")),
-                new UnitClusterStatus("unit", Version.parseVersion("1.0.2"), 
DeploymentStatus.DEPLOYED, 0, Set.of("node1", "node2")),
-                new UnitClusterStatus("unit", Version.parseVersion("1.0.3"), 
DeploymentStatus.UPLOADING, 0,  Set.of("node1", "node2")),
-                new UnitClusterStatus("unit", Version.parseVersion("1.0.4"), 
DeploymentStatus.REMOVING, 0, Set.of("node1", "node2"))
+                new UnitClusterStatus("unit", Version.parseVersion("1.0.0"), 
DEPLOYED, UUID.randomUUID(), Set.of("node1", "node2")),
+                new UnitClusterStatus("unit", Version.parseVersion("1.0.1"), 
OBSOLETE, UUID.randomUUID(), Set.of("node1", "node2")),
+                new UnitClusterStatus("unit", Version.parseVersion("1.0.2"), 
DEPLOYED, UUID.randomUUID(), Set.of("node1", "node2")),
+                new UnitClusterStatus("unit", Version.parseVersion("1.0.3"), 
UPLOADING, UUID.randomUUID(),  Set.of("node1", "node2")),
+                new UnitClusterStatus("unit", Version.parseVersion("1.0.4"), 
REMOVING, UUID.randomUUID(), Set.of("node1", "node2"))
         );
 
         
doReturn(completedFuture(unitClusterStatuses)).when(deploymentUnitStore).getClusterStatuses("unit");


Reply via email to