petrov-mg commented on code in PR #13195:
URL: https://github.com/apache/ignite/pull/13195#discussion_r3443423205


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/rollingupgrade/RollingUpgradeProcessor.java:
##########
@@ -17,331 +17,564 @@
 
 package org.apache.ignite.internal.processors.rollingupgrade;
 
-import java.util.Objects;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
+import java.util.function.Supplier;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteVersionUtils;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import 
org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage;
-import 
org.apache.ignite.internal.processors.metastorage.DistributedMetastorageLifecycleListener;
-import 
org.apache.ignite.internal.processors.metastorage.ReadableDistributedMetaStorage;
 import 
org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
-import org.apache.ignite.internal.util.lang.IgnitePair;
+import 
org.apache.ignite.internal.processors.rollingupgrade.feature.IgniteFeature;
+import 
org.apache.ignite.internal.processors.rollingupgrade.feature.IgniteFeatureManager;
+import 
org.apache.ignite.internal.processors.rollingupgrade.feature.IgniteFeatureSet;
+import 
org.apache.ignite.internal.processors.rollingupgrade.feature.IgniteProductFeatures;
+import 
org.apache.ignite.internal.processors.rollingupgrade.feature.SupportedFeaturesRegistry;
+import org.apache.ignite.internal.util.distributed.DistributedProcess;
+import org.apache.ignite.internal.util.distributed.InitMessage;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.lang.IgniteProductVersion;
-import org.apache.ignite.plugin.security.SecurityPermission;
+import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteNodeValidationResult;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.events.EventType.EVT_NODE_VALIDATION_FAILED;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER;
-import static 
org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage.IGNITE_INTERNAL_KEY_PREFIX;
-
-/** Rolling upgrade processor. Manages current and target versions of cluster. 
*/
+import static 
org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.ROLLING_UPGRADE_PROC;
+import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_FEATURES;
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RU_COMPLETE_VERSION_FINALIZATION;
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RU_ENABLE;
+import static 
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.RU_PREPARE_VERSION_FINALIZATION;
+import static 
org.apache.ignite.plugin.security.SecurityPermission.ADMIN_ROLLING_UPGRADE;
+
+/** */
 public class RollingUpgradeProcessor extends GridProcessorAdapter implements 
DiscoveryNodeValidationProcessor {
-    /** Key for the distributed property that holds current and target 
versions. */
-    private static final String ROLLING_UPGRADE_VERSIONS_KEY = 
IGNITE_INTERNAL_KEY_PREFIX + "rolling.upgrade.versions";
+    /** */
+    private final IgniteFeatureManager featureMgr;
 
-    /** Metastorage with the write access. */
-    @Nullable private volatile DistributedMetaStorage metastorage;
+    /** */
+    private final ClusterVersionUpgradeEnableProcess enableProc;
+
+    /** */
+    private final ClusterVersionFinalizationProcess finalizeProc;
 
-    /** Last joining node. */
-    private ClusterNode lastJoiningNode;
+    /** */
+    private final Object topGuard = new Object();
 
-    /** Lock for synchronization between tcp-disco-msg-worker thread and 
management operations. */
-    private final Object lock = new Object();
+    /** */
+    private final Set<ClusterNode> joiningNodes = new HashSet<>();
 
     /** */
-    private final CountDownLatch startLatch = new CountDownLatch(1);
+    private volatile boolean isNodeFenceActive;
 
-    /** Pair with current and target versions. {@code null} when rolling 
upgrade is disabled. */
-    @Nullable private volatile IgnitePair<IgniteProductVersion> rollUpVers;
+    /** */
+    private volatile boolean isVerUpgradeEnabled;
 
-    /**
-     * @param ctx Context.
-     */
+    /** */
     public RollingUpgradeProcessor(GridKernalContext ctx) {
+        this(ctx, () -> new IgniteProductFeatures(
+            IgniteVersionUtils.VER,
+            IgniteFeatureSet.buildFrom(SupportedFeaturesRegistry.class))
+        );
+    }
+
+    /** */
+    protected RollingUpgradeProcessor(GridKernalContext ctx, 
Supplier<IgniteProductFeatures> locVerFeaturesProv) {
         super(ctx);
+
+        enableProc = new ClusterVersionUpgradeEnableProcess();
+        finalizeProc = new ClusterVersionFinalizationProcess();
+        featureMgr = new IgniteFeatureManager(ctx, locVerFeaturesProv);
     }
 
-    /** {@inheritDoc} */
-    @Override public void onKernalStart(boolean active) throws 
IgniteCheckedException {
-        startLatch.countDown();
+    /** @return Whether nodes running a higher Ignite version are allowed to 
join the cluster. */
+    public boolean isVersionUpgradeEnabled() {
+        return isVerUpgradeEnabled;
+    }
+
+    /**
+     * Allows nodes running a higher Ignite version to join the cluster.
+     * Until the cluster version is finalized, nodes running a higher version 
operate in
+     * a compatibility mode that emulates the behavior of the current cluster 
version.
+     */
+    public void enableVersionUpgrade() throws IgniteCheckedException {
+        ctx.security().authorize(ADMIN_ROLLING_UPGRADE);
+
+        if (isVerUpgradeEnabled)
+            return;
+
+        enableProc.start().get();
+
+        if (log.isInfoEnabled())
+            log.info("Cluster version Rolling Upgrade was successfully 
enabled");
+    }
+
+    /**
+     * Tries to finalize the cluster version.
+     *
+     * <p>If all cluster nodes are running the same Ignite version, this 
method:</p>
+     * <ol>
+     *     <li>Prevents nodes running a higher Ignite version from joining the 
cluster.</li>
+     *     <li>Activates all {@link IgniteFeature}s supported by the finalized 
cluster version.</li>
+     * </ol>
+     *
+     * <p>If the cluster contains nodes running different Ignite versions, the 
operation fails.</p>
+     */
+    public void finalizeClusterVersion() throws IgniteCheckedException {
+        ctx.security().authorize(ADMIN_ROLLING_UPGRADE);
+
+        if (!isVerUpgradeEnabled)
+            return;
+
+        finalizeProc.start().get();
+
+        if (log.isInfoEnabled())
+            log.info("Cluster version was successfully finalized 
[activeLogicalVer=" + clusterLogicalVersion() + ']');
+    }
+
+    /** */
+    public IgniteFeatureManager features() {
+        return featureMgr;
+    }
+
+    /** */
+    RollingUpgradeState state() {
+        synchronized (topGuard) {
+            return detectRollingUpgradeState();
+        }
     }
 
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
+        ctx.addNodeAttribute(
+            ATTR_IGNITE_FEATURES,
+            U.marshal(ctx.marshallerContext().jdkMarshaller(), 
featureMgr.localVersionFeatures().features()));
+
         ctx.event().addLocalEventListener(
             evt -> {
-                UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
-
-                synchronized (lock) {
-                    if (lastJoiningNode != null && 
lastJoiningNode.id().equals(nodeId))
-                        lastJoiningNode = null;
+                synchronized (topGuard) {
+                    joiningNodes.remove(((DiscoveryEvent)evt).eventNode());
                 }
             },
             EVT_NODE_JOINED,
             EVT_NODE_FAILED,
             EVT_NODE_LEFT,
             EVT_NODE_VALIDATION_FAILED
         );
+    }
+
+    /** {@inheritDoc} */
+    @Override public DiscoveryDataExchangeType discoveryDataType() {
+        return ROLLING_UPGRADE_PROC;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+        if (ctx.clientNode())
+            return;
+
+        int cmpId = discoveryDataType().ordinal();
+
+        if (!dataBag.commonDataCollectedFor(cmpId))
+            dataBag.addGridCommonData(cmpId, collectRollingUpgradeNodeData());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void 
onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data) {
+        RollingUpgradeNodeData gridData = 
(RollingUpgradeNodeData)data.commonData();
+
+        isVerUpgradeEnabled = gridData.isVersionUpgradeEnabled();
+        isNodeFenceActive = gridData.isNodeFenceActive();
 
-        
ctx.internalSubscriptionProcessor().registerDistributedMetastorageListener(new 
DistributedMetastorageLifecycleListener() {
-            @Override public void onReadyForWrite(DistributedMetaStorage 
metastorage) {
-                RollingUpgradeProcessor.this.metastorage = metastorage;
+        featureMgr.onGridDataReceived(gridData.activeFeatures());
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable IgniteNodeValidationResult 
validateNode(ClusterNode joiningNode) {
+        synchronized (topGuard) {
+            if (isNodeFenceActive) {
+                return new IgniteNodeValidationResult(
+                    joiningNode.id(),
+                    "Node joins are not allowed during cluster version 
finalization [joiningNode=" + joiningNode + ']');

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to