This is an automated email from the ASF dual-hosted git repository. apkhmv pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 1539873b49 IGNITE-21757 Fix redeploy (#3528) 1539873b49 is described below commit 1539873b49f041b4f39844d512012685d09d33ea Author: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com> AuthorDate: Fri Apr 5 10:14:19 2024 +0300 IGNITE-21757 Fix redeploy (#3528) Now the UUID is used as an operation id that fixes ABA problem. --- .../ignite/internal/deployunit/UnitStatus.java | 26 +++----- .../metastore/ClusterEventCallbackImpl.java | 19 +++--- .../metastore/DeploymentUnitFailover.java | 14 +++-- .../deployunit/metastore/DeploymentUnitStore.java | 9 ++- .../metastore/DeploymentUnitStoreImpl.java | 33 +++++++--- .../metastore/status/SerializeUtils.java | 34 +++++++++- .../metastore/status/UnitClusterStatus.java | 24 +++++--- .../metastore/status/UnitNodeStatus.java | 35 +++++------ .../deployment/UnitStatusesSerializerTest.java | 21 ++++--- .../metastore/DeploymentUnitStoreImplTest.java | 72 ++++++++++++++++++---- .../deployunit/DeploymentManagerImplTest.java | 15 +++-- 11 files changed, 202 insertions(+), 100 deletions(-) diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java index 169fb174bb..0a85c60e7b 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/UnitStatus.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.deployunit; +import java.util.Objects; +import java.util.UUID; import org.apache.ignite.compute.version.Version; /** @@ -38,7 +40,7 @@ public abstract class UnitStatus { */ private DeploymentStatus status; - private final long opId; + private final UUID opId; /** * Constructor. @@ -48,7 +50,7 @@ public abstract class UnitStatus { * @param status Unit status. * @param opId Deployment unit operation identifier. */ - public UnitStatus(String id, Version version, DeploymentStatus status, long opId) { + public UnitStatus(String id, Version version, DeploymentStatus status, UUID opId) { this.id = id; this.version = version; this.status = status; @@ -96,7 +98,7 @@ public abstract class UnitStatus { * * @return Operation identifier of deployment unit creation. */ - public long opId() { + public UUID opId() { return opId; } @@ -108,24 +110,14 @@ public abstract class UnitStatus { if (o == null || getClass() != o.getClass()) { return false; } - - UnitStatus meta = (UnitStatus) o; - - if (id != null ? !id.equals(meta.id) : meta.id != null) { - return false; - } - if (version != null ? !version.equals(meta.version) : meta.version != null) { - return false; - } - return status == meta.status; + UnitStatus that = (UnitStatus) o; + return Objects.equals(id, that.id) && Objects.equals(version, that.version) && status == that.status + && Objects.equals(opId, that.opId); } @Override public int hashCode() { - int result = id != null ? id.hashCode() : 0; - result = 31 * result + (version != null ? version.hashCode() : 0); - result = 31 * result + (status != null ? status.hashCode() : 0); - return result; + return Objects.hash(id, version, status, opId); } @Override diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java index b8130502e8..189f2ebef4 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/ClusterEventCallbackImpl.java @@ -20,12 +20,14 @@ package org.apache.ignite.internal.deployunit.metastore; import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; import org.apache.ignite.compute.version.Version; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.deployunit.FileDeployerService; import org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus; +import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus; /** Listener of deployment unit cluster status changes. */ public class ClusterEventCallbackImpl extends ClusterEventCallback { @@ -64,35 +66,36 @@ public class ClusterEventCallbackImpl extends ClusterEventCallback { // Now the deployment unit can be removed from each target node and, after it, remove corresponding status records. deploymentUnitStore.getNodeStatus(nodeName, id, version).thenAccept(nodeStatus -> { if (nodeStatus != null && nodeStatus.status() == REMOVING) { - undeploy(id, version); + undeploy(id, version, nodeStatus.opId()); } }); } - private void undeploy(String id, Version version) { + private void undeploy(String id, Version version, UUID opId) { deployerService.undeploy(id, version).thenAccept(success -> { if (success) { - deploymentUnitStore.removeNodeStatus(nodeName, id, version).thenAccept(successRemove -> { + deploymentUnitStore.removeNodeStatus(nodeName, id, version, opId).thenAccept(successRemove -> { if (successRemove) { - removeClusterStatus(id, version); + removeClusterStatus(id, version, opId); } }); } }); } - private void removeClusterStatus(String id, Version version) { + private void removeClusterStatus(String id, Version version, UUID opId) { cmgManager.logicalTopology().thenAccept(logicalTopology -> { Set<String> logicalNodes = logicalTopology.nodes().stream() .map(LogicalNode::name) .collect(Collectors.toSet()); - deploymentUnitStore.getAllNodes(id, version).thenAccept(nodes -> { - boolean emptyTopology = nodes.stream() + deploymentUnitStore.getAllNodeStatuses(id, version).thenAccept(statuses -> { + boolean emptyTopology = statuses.stream() + .map(UnitNodeStatus::nodeId) .filter(logicalNodes::contains) .findAny() .isEmpty(); if (emptyTopology) { - deploymentUnitStore.removeClusterStatus(id, version); + deploymentUnitStore.removeClusterStatus(id, version, opId); } }); }); diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java index 5f915341c2..ee07fb5edb 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitFailover.java @@ -23,6 +23,7 @@ import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING; import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING; import java.util.Objects; +import java.util.UUID; import org.apache.ignite.compute.version.Version; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener; @@ -92,7 +93,7 @@ public class DeploymentUnitFailover { Version version = unitNodeStatus.version(); if (unitClusterStatus == null) { - undeploy(id, version); + undeploy(id, version, unitNodeStatus.opId()); return; } @@ -126,11 +127,11 @@ public class DeploymentUnitFailover { } } - private void undeploy(String id, Version version) { + private void undeploy(String id, Version version, UUID opId) { deployer.undeploy(id, version) .thenAccept(success -> { if (success) { - deploymentUnitStore.removeNodeStatus(nodeName, id, version); + deploymentUnitStore.removeNodeStatus(nodeName, id, version, opId); } }); } @@ -138,11 +139,12 @@ public class DeploymentUnitFailover { private boolean checkAbaProblem(UnitClusterStatus clusterStatus, UnitNodeStatus nodeStatus) { String id = nodeStatus.id(); Version version = nodeStatus.version(); - if (clusterStatus.opId() != nodeStatus.opId()) { + UUID opId = nodeStatus.opId(); + if (!Objects.equals(clusterStatus.opId(), opId)) { if (nodeStatus.status() == DEPLOYED) { - undeploy(id, version); + undeploy(id, version, opId); } else { - deploymentUnitStore.removeNodeStatus(nodeName, id, version); + deploymentUnitStore.removeNodeStatus(nodeName, id, version, opId); } return true; } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java index 74e87beb00..13fd19f2ae 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStore.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.deployunit.metastore; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.ignite.compute.version.Version; import org.apache.ignite.internal.deployunit.DeploymentStatus; @@ -133,7 +134,7 @@ public interface DeploymentUnitStore { String nodeId, String id, Version version, - long opId, + UUID opId, DeploymentStatus status); /** @@ -179,16 +180,18 @@ public interface DeploymentUnitStore { * * @param id Deployment unit identifier. * @param version Deployment version identifier. + * @param opId Operation identifier. * @return Future with {@code true} result if removed successfully. */ - CompletableFuture<Boolean> removeClusterStatus(String id, Version version); + CompletableFuture<Boolean> removeClusterStatus(String id, Version version, UUID opId); /** * Removes node status. * * @param id Deployment unit identifier. * @param version Deployment version identifier. + * @param opId Operation identifier. * @return Future with {@code true} result if removed successfully. */ - CompletableFuture<Boolean> removeNodeStatus(String nodeId, String id, Version version); + CompletableFuture<Boolean> removeNodeStatus(String nodeId, String id, Version version, UUID opId); } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java index e454b37908..fb15c86285 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/DeploymentUnitStoreImpl.java @@ -25,11 +25,13 @@ import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists; import static org.apache.ignite.internal.metastorage.dsl.Conditions.revision; import static org.apache.ignite.internal.metastorage.dsl.Operations.noop; import static org.apache.ignite.internal.metastorage.dsl.Operations.put; +import static org.apache.ignite.internal.metastorage.dsl.Operations.remove; import static org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture; import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.stream.Collectors; @@ -43,7 +45,6 @@ import org.apache.ignite.internal.deployunit.metastore.status.UnitClusterStatus; import org.apache.ignite.internal.deployunit.metastore.status.UnitNodeStatus; import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.metastorage.MetaStorageManager; -import org.apache.ignite.internal.metastorage.dsl.Operations; /** * Implementation of {@link DeploymentUnitStore} based on {@link MetaStorageManager}. @@ -142,8 +143,8 @@ public class DeploymentUnitStoreImpl implements DeploymentUnitStore { @Override public CompletableFuture<UnitClusterStatus> createClusterStatus(String id, Version version, Set<String> nodes) { ByteArray key = ClusterStatusKey.builder().id(id).version(version).build().toByteArray(); - long revision = metaStorage.appliedRevision(); - UnitClusterStatus clusterStatus = new UnitClusterStatus(id, version, UPLOADING, revision, nodes); + UUID operationId = UUID.randomUUID(); + UnitClusterStatus clusterStatus = new UnitClusterStatus(id, version, UPLOADING, operationId, nodes); byte[] value = UnitClusterStatus.serialize(clusterStatus); return metaStorage.invoke(notExists(key), put(key, value), noop()) @@ -155,7 +156,7 @@ public class DeploymentUnitStoreImpl implements DeploymentUnitStore { String nodeId, String id, Version version, - long opId, + UUID opId, DeploymentStatus status ) { ByteArray key = NodeStatusKey.builder().id(id).version(version).nodeId(nodeId).build().toByteArray(); @@ -208,17 +209,31 @@ public class DeploymentUnitStoreImpl implements DeploymentUnitStore { } @Override - public CompletableFuture<Boolean> removeClusterStatus(String id, Version version) { + public CompletableFuture<Boolean> removeClusterStatus(String id, Version version, UUID opId) { ByteArray key = ClusterStatusKey.builder().id(id).version(version).build().toByteArray(); - return metaStorage.invoke(exists(key), Operations.remove(key), noop()); + return metaStorage.get(key).thenCompose(e -> { + UnitClusterStatus prev = UnitClusterStatus.deserialize(e.value()); + if (!Objects.equals(prev.opId(), opId)) { + return falseCompletedFuture(); + } + + return metaStorage.invoke(revision(key).eq(e.revision()), remove(key), noop()); + }); } @Override - public CompletableFuture<Boolean> removeNodeStatus(String nodeId, String id, Version version) { + public CompletableFuture<Boolean> removeNodeStatus(String nodeId, String id, Version version, UUID opId) { ByteArray key = NodeStatusKey.builder().id(id).version(version).nodeId(nodeId).build().toByteArray(); - return metaStorage.invoke(exists(key), Operations.remove(key), noop()); + return metaStorage.get(key).thenCompose(e -> { + UnitNodeStatus prev = UnitNodeStatus.deserialize(e.value()); + if (!Objects.equals(prev.opId(), opId)) { + return falseCompletedFuture(); + } + + return metaStorage.invoke(revision(key).eq(e.revision()), remove(key), noop()); + }); } /** @@ -244,7 +259,7 @@ public class DeploymentUnitStoreImpl implements DeploymentUnitStore { } return metaStorage.invoke( - force ? exists(key) : revision(key).le(e.revision()), + force ? exists(key) : revision(key).eq(e.revision()), put(key, newValue), noop()) .thenCompose(finished -> { diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/SerializeUtils.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/SerializeUtils.java index 120fc60edd..6068ed80eb 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/SerializeUtils.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/SerializeUtils.java @@ -24,8 +24,13 @@ import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; +import org.apache.ignite.compute.version.Version; +import org.apache.ignite.compute.version.VersionParseException; +import org.apache.ignite.internal.deployunit.DeploymentStatus; import org.apache.ignite.internal.deployunit.UnitStatus; +import org.jetbrains.annotations.Nullable; /** * Serializer for {@link UnitStatus}. @@ -106,7 +111,34 @@ public final class SerializeUtils { return new String(Base64.getDecoder().decode(s), UTF_8); } - public static boolean checkElement(String[] arr, int index) { + static boolean checkElement(String[] arr, int index) { return arr.length > index && arr[index] != null && !arr[index].isBlank(); } + + @Nullable + static Version deserializeVersion(String[] values, int index) { + try { + return checkElement(values, index) ? Version.parseVersion(decode(values[index])) : null; + } catch (VersionParseException e) { + return null; + } + } + + @Nullable + static DeploymentStatus deserializeStatus(String[] values, int index) { + try { + return checkElement(values, index) ? DeploymentStatus.valueOf(decode(values[index])) : null; + } catch (IllegalArgumentException e) { + return null; + } + } + + @Nullable + static UUID deserializeUuid(String[] values, int index) { + try { + return checkElement(values, index) ? UUID.fromString(decode(values[index])) : null; + } catch (IllegalArgumentException e) { + return null; + } + } } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java index f1b7b2728d..63939b7f1a 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitClusterStatus.java @@ -18,12 +18,19 @@ package org.apache.ignite.internal.deployunit.metastore.status; import static org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.checkElement; +import static org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.decode; +import static org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.decodeAsSet; +import static org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeStatus; +import static org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeUuid; +import static org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeVersion; import java.util.Collections; import java.util.Set; +import java.util.UUID; import org.apache.ignite.compute.version.Version; import org.apache.ignite.internal.deployunit.DeploymentStatus; import org.apache.ignite.internal.deployunit.UnitStatus; +import org.jetbrains.annotations.Nullable; /** * Deployment unit cluster status. @@ -44,7 +51,7 @@ public class UnitClusterStatus extends UnitStatus { String id, Version version, DeploymentStatus status, - long opId, + UUID opId, Set<String> initialNodesToDeploy ) { super(id, version, status, opId); @@ -102,19 +109,18 @@ public class UnitClusterStatus extends UnitStatus { * @param value Serialized deployment unit cluster status. * @return Deserialized deployment unit cluster status. */ - public static UnitClusterStatus deserialize(byte[] value) { + public static UnitClusterStatus deserialize(byte @Nullable [] value) { if (value == null || value.length == 0) { - return new UnitClusterStatus(null, null, null, 0, Set.of()); + return new UnitClusterStatus(null, null, null, null, Set.of()); } String[] values = SerializeUtils.deserialize(value); - String id = checkElement(values, 0) ? SerializeUtils.decode(values[0]) : null; - Version version = checkElement(values, 1) ? Version.parseVersion(SerializeUtils.decode(values[1])) : null; - DeploymentStatus status = checkElement(values, 2) ? DeploymentStatus.valueOf(SerializeUtils.decode(values[2])) : null; - long opId = checkElement(values, 3) ? Long.parseLong(SerializeUtils.decode(values[3])) : 0; - Set<String> nodes = checkElement(values, 4) ? SerializeUtils.decodeAsSet(values[4]) : Set.of(); - + String id = checkElement(values, 0) ? decode(values[0]) : null; + Version version = deserializeVersion(values, 1); + DeploymentStatus status = deserializeStatus(values, 2); + UUID opId = deserializeUuid(values, 3); + Set<String> nodes = checkElement(values, 4) ? decodeAsSet(values[4]) : Set.of(); return new UnitClusterStatus(id, version, status, opId, nodes); } diff --git a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java index 92dee426ab..0c32b788e3 100644 --- a/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java +++ b/modules/code-deployment/src/main/java/org/apache/ignite/internal/deployunit/metastore/status/UnitNodeStatus.java @@ -18,11 +18,16 @@ package org.apache.ignite.internal.deployunit.metastore.status; import static org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.checkElement; +import static org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.decode; +import static org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeStatus; +import static org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeUuid; +import static org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils.deserializeVersion; +import java.util.UUID; import org.apache.ignite.compute.version.Version; -import org.apache.ignite.compute.version.VersionParseException; import org.apache.ignite.internal.deployunit.DeploymentStatus; import org.apache.ignite.internal.deployunit.UnitStatus; +import org.jetbrains.annotations.Nullable; /** * Deployment unit node status. @@ -39,7 +44,7 @@ public class UnitNodeStatus extends UnitStatus { * @param opId Deployment unit operation identifier. * @param nodeId Node consistent id. */ - public UnitNodeStatus(String id, Version version, DeploymentStatus status, long opId, String nodeId) { + public UnitNodeStatus(String id, Version version, DeploymentStatus status, UUID opId, String nodeId) { super(id, version, status, opId); this.nodeId = nodeId; } @@ -94,30 +99,18 @@ public class UnitNodeStatus extends UnitStatus { * @param value Serialized deployment unit node status. * @return Deserialized deployment unit node status. */ - public static UnitNodeStatus deserialize(byte[] value) { + public static UnitNodeStatus deserialize(byte @Nullable [] value) { if (value == null || value.length == 0) { - return new UnitNodeStatus(null, null, null, 0, null); + return new UnitNodeStatus(null, null, null, null, null); } String[] values = SerializeUtils.deserialize(value); - String id = checkElement(values, 0) ? SerializeUtils.decode(values[0]) : null; - Version version; - try { - version = checkElement(values, 1) ? Version.parseVersion(SerializeUtils.decode(values[1])) : null; - } catch (VersionParseException e) { - version = null; - } - DeploymentStatus status; - try { - status = checkElement(values, 2) ? DeploymentStatus.valueOf(SerializeUtils.decode(values[2])) : null; - } catch (IllegalArgumentException e) { - status = null; - } - - long opId = checkElement(values, 3) ? Long.parseLong(SerializeUtils.decode(values[3])) : 0; - - String nodeId = checkElement(values, 4) ? SerializeUtils.decode(values[4]) : null; + String id = checkElement(values, 0) ? decode(values[0]) : null; + Version version = deserializeVersion(values, 1); + DeploymentStatus status = deserializeStatus(values, 2); + UUID opId = deserializeUuid(values, 3); + String nodeId = checkElement(values, 4) ? decode(values[4]) : null; return new UnitNodeStatus(id, version, status, opId, nodeId); } diff --git a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitStatusesSerializerTest.java b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitStatusesSerializerTest.java index 27a4b38a82..5b4609ac53 100644 --- a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitStatusesSerializerTest.java +++ b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/UnitStatusesSerializerTest.java @@ -24,6 +24,7 @@ import static org.junit.jupiter.params.provider.Arguments.arguments; import java.util.List; import java.util.Set; +import java.util.UUID; import org.apache.ignite.compute.version.Version; import org.apache.ignite.internal.deployunit.DeploymentStatus; import org.apache.ignite.internal.deployunit.metastore.status.SerializeUtils; @@ -39,19 +40,19 @@ import org.junit.jupiter.params.provider.MethodSource; public class UnitStatusesSerializerTest { private static List<Arguments> nodeStatusProvider() { return List.of( - arguments(null, null, null, 0, null), - arguments("id", null, null, 0, null), - arguments("id", Version.LATEST, null, 0, null), - arguments("id", Version.LATEST, UPLOADING, 10, "node1") + arguments(null, null, null, null, null), + arguments("id", null, null, UUID.randomUUID(), null), + arguments("id", Version.LATEST, null, UUID.randomUUID(), null), + arguments("id", Version.LATEST, UPLOADING, UUID.randomUUID(), "node1") ); } private static List<Arguments> clusterStatusProvider() { return List.of( - arguments("id", Version.LATEST, UPLOADING, 0, Set.of()), - arguments("id", Version.LATEST, UPLOADING, 1, Set.of("node1")), - arguments("id", Version.LATEST, UPLOADING, 1, Set.of("node1", "node2")), - arguments("id", Version.LATEST, UPLOADING, 1, Set.of("node1", "node2", "node3")) + arguments("id", Version.LATEST, UPLOADING, null, Set.of()), + arguments("id", Version.LATEST, UPLOADING, UUID.randomUUID(), Set.of("node1")), + arguments("id", Version.LATEST, UPLOADING, UUID.randomUUID(), Set.of("node1", "node2")), + arguments("id", Version.LATEST, UPLOADING, UUID.randomUUID(), Set.of("node1", "node2", "node3")) ); } @@ -61,7 +62,7 @@ public class UnitStatusesSerializerTest { String id, Version version, DeploymentStatus status, - long opId, + UUID opId, String nodeId ) { UnitNodeStatus nodeStatus = new UnitNodeStatus(id, version, status, opId, nodeId); @@ -77,7 +78,7 @@ public class UnitStatusesSerializerTest { String id, Version version, DeploymentStatus status, - long opId, + UUID opId, Set<String> consistentIdLocation ) { UnitClusterStatus nodeStatus = new UnitClusterStatus(id, version, status, opId, consistentIdLocation); diff --git a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java index 22c43fc572..8ad55e40bc 100644 --- a/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java +++ b/modules/code-deployment/src/test/java/org/apache/ignite/deployment/metastore/DeploymentUnitStoreImplTest.java @@ -34,6 +34,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.ignite.compute.version.Version; import org.apache.ignite.internal.deployunit.metastore.ClusterEventCallback; @@ -129,7 +130,7 @@ public class DeploymentUnitStoreImplTest extends BaseIgniteAbstractTest { assertThat(clusterStatusFuture, willCompleteSuccessfully()); UnitClusterStatus clusterStatus = clusterStatusFuture.get(); - long opId = clusterStatus.opId(); + UUID opId = clusterStatus.opId(); assertThat(metastore.getClusterStatus(id, version), willBe(new UnitClusterStatus(id, version, UPLOADING, opId, Set.of()))); @@ -138,11 +139,59 @@ public class DeploymentUnitStoreImplTest extends BaseIgniteAbstractTest { assertThat(metastore.getClusterStatus(id, version), willBe(new UnitClusterStatus(id, version, DEPLOYED, opId, Set.of()))); - assertThat(metastore.removeClusterStatus(id, version), willBe(true)); + assertThat(metastore.removeClusterStatus(id, version, opId), willBe(true)); assertThat(metastore.getClusterStatus(id, version), willBe(nullValue())); } + @Test + void clusterStatusAba() { + String id = "id1"; + Version version = Version.parseVersion("1.1.1"); + + CompletableFuture<UnitClusterStatus> clusterStatusFuture1 = metastore.createClusterStatus(id, version, Set.of()); + assertThat(clusterStatusFuture1, willCompleteSuccessfully()); + + UUID opId1 = clusterStatusFuture1.join().opId(); + + assertThat(metastore.removeClusterStatus(id, version, opId1), willBe(true)); + + // Create new cluster status with the same id and version + CompletableFuture<UnitClusterStatus> clusterStatusFuture2 = metastore.createClusterStatus(id, version, Set.of()); + assertThat(clusterStatusFuture2, willCompleteSuccessfully()); + + UUID opId2 = clusterStatusFuture2.join().opId(); + + // Remove with the initial operation ID should fail + assertThat(metastore.removeClusterStatus(id, version, opId1), willBe(false)); + + // Remove with the correct operation ID should succeed + assertThat(metastore.removeClusterStatus(id, version, opId2), willBe(true)); + } + + @Test + void nodeStatusAba() { + String id = "id1"; + Version version = Version.parseVersion("1.1.1"); + String node1 = "node1"; + + UUID opId1 = UUID.randomUUID(); + UUID opId2 = UUID.randomUUID(); + + assertThat(metastore.createNodeStatus(node1, id, version, opId1, UPLOADING), willBe(true)); + + assertThat(metastore.removeNodeStatus(node1, id, version, opId1), willBe(true)); + + // Create new node status with the same id and version + assertThat(metastore.createNodeStatus(node1, id, version, opId2, UPLOADING), willBe(true)); + + // Remove with the initial operation ID should fail + assertThat(metastore.removeNodeStatus(node1, id, version, opId1), willBe(false)); + + // Remove with the correct operation ID should succeed + assertThat(metastore.removeNodeStatus(node1, id, version, opId2), willBe(true)); + } + @Test public void nodeStatusTest() throws Exception { String id = "id2"; @@ -156,7 +205,7 @@ public class DeploymentUnitStoreImplTest extends BaseIgniteAbstractTest { assertThat(clusterStatusFuture, willCompleteSuccessfully()); UnitClusterStatus clusterStatus = clusterStatusFuture.get(); - long opId = clusterStatus.opId(); + UUID opId = clusterStatus.opId(); assertThat(metastore.getClusterStatus(id, version), willBe(new UnitClusterStatus(id, version, UPLOADING, opId, Set.of(node1, node2, node3)))); @@ -183,11 +232,11 @@ public class DeploymentUnitStoreImplTest extends BaseIgniteAbstractTest { willBe(contains((new UnitClusterStatus(id, version, DEPLOYED, opId, Set.of(node1, node2, node3))))) ); - assertThat(metastore.removeClusterStatus(id, version), willBe(true)); + assertThat(metastore.removeClusterStatus(id, version, opId), willBe(true)); assertThat(metastore.getNodeStatus(node1, id, version), willBe(new UnitNodeStatus(id, version, DEPLOYED, opId, node1))); - assertThat(metastore.removeNodeStatus(node1, id, version), willBe(true)); + assertThat(metastore.removeNodeStatus(node1, id, version, opId), willBe(true)); assertThat(metastore.getNodeStatus(node1, id, version), willBe(nullValue())); } @@ -197,17 +246,18 @@ public class DeploymentUnitStoreImplTest extends BaseIgniteAbstractTest { Version version = Version.parseVersion("1.1.1"); String node1 = LOCAL_NODE; - assertThat(metastore.createNodeStatus(node1, id, version, 0, UPLOADING), willBe(true)); + UUID opId = UUID.randomUUID(); + assertThat(metastore.createNodeStatus(node1, id, version, opId, UPLOADING), willBe(true)); assertThat(metastore.updateNodeStatus(node1, id, version, DEPLOYED), willBe(true)); assertThat(metastore.updateNodeStatus(node1, id, version, OBSOLETE), willBe(true)); assertThat(metastore.updateNodeStatus(node1, id, version, REMOVING), willBe(true)); await().untilAsserted(() -> assertThat(nodeHistory, containsInAnyOrder( - new UnitNodeStatus(id, version, UPLOADING, 0, node1), - new UnitNodeStatus(id, version, DEPLOYED, 0, node1), - new UnitNodeStatus(id, version, OBSOLETE, 0, node1), - new UnitNodeStatus(id, version, REMOVING, 0, node1) + new UnitNodeStatus(id, version, UPLOADING, opId, node1), + new UnitNodeStatus(id, version, DEPLOYED, opId, node1), + new UnitNodeStatus(id, version, OBSOLETE, opId, node1), + new UnitNodeStatus(id, version, REMOVING, opId, node1) ))); } @@ -220,7 +270,7 @@ public class DeploymentUnitStoreImplTest extends BaseIgniteAbstractTest { assertThat(clusterStatusFuture, willCompleteSuccessfully()); UnitClusterStatus clusterStatus = clusterStatusFuture.get(); - long opId = clusterStatus.opId(); + UUID opId = clusterStatus.opId(); assertThat(metastore.updateClusterStatus(id, version, DEPLOYED), willBe(true)); assertThat(metastore.updateClusterStatus(id, version, OBSOLETE), willBe(true)); diff --git a/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentManagerImplTest.java b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentManagerImplTest.java index a763fa5ed3..28d1da95a9 100644 --- a/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentManagerImplTest.java +++ b/modules/code-deployment/src/test/java/org/apache/ignite/internal/deployunit/DeploymentManagerImplTest.java @@ -18,6 +18,10 @@ package org.apache.ignite.internal.deployunit; import static java.util.concurrent.CompletableFuture.completedFuture; +import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED; +import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE; +import static org.apache.ignite.internal.deployunit.DeploymentStatus.REMOVING; +import static org.apache.ignite.internal.deployunit.DeploymentStatus.UPLOADING; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -26,6 +30,7 @@ import static org.mockito.Mockito.doReturn; import java.nio.file.Path; import java.util.List; import java.util.Set; +import java.util.UUID; import org.apache.ignite.compute.version.Version; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; @@ -66,11 +71,11 @@ class DeploymentManagerImplTest extends BaseIgniteAbstractTest { @Test public void detectLatestDeployedVersion() { List<UnitClusterStatus> unitClusterStatuses = List.of( - new UnitClusterStatus("unit", Version.parseVersion("1.0.0"), DeploymentStatus.DEPLOYED, 0, Set.of("node1", "node2")), - new UnitClusterStatus("unit", Version.parseVersion("1.0.1"), DeploymentStatus.OBSOLETE, 0, Set.of("node1", "node2")), - new UnitClusterStatus("unit", Version.parseVersion("1.0.2"), DeploymentStatus.DEPLOYED, 0, Set.of("node1", "node2")), - new UnitClusterStatus("unit", Version.parseVersion("1.0.3"), DeploymentStatus.UPLOADING, 0, Set.of("node1", "node2")), - new UnitClusterStatus("unit", Version.parseVersion("1.0.4"), DeploymentStatus.REMOVING, 0, Set.of("node1", "node2")) + new UnitClusterStatus("unit", Version.parseVersion("1.0.0"), DEPLOYED, UUID.randomUUID(), Set.of("node1", "node2")), + new UnitClusterStatus("unit", Version.parseVersion("1.0.1"), OBSOLETE, UUID.randomUUID(), Set.of("node1", "node2")), + new UnitClusterStatus("unit", Version.parseVersion("1.0.2"), DEPLOYED, UUID.randomUUID(), Set.of("node1", "node2")), + new UnitClusterStatus("unit", Version.parseVersion("1.0.3"), UPLOADING, UUID.randomUUID(), Set.of("node1", "node2")), + new UnitClusterStatus("unit", Version.parseVersion("1.0.4"), REMOVING, UUID.randomUUID(), Set.of("node1", "node2")) ); doReturn(completedFuture(unitClusterStatuses)).when(deploymentUnitStore).getClusterStatuses("unit");