This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c679b47303 Don’t finish ongoing decommission and move operations 
during startup
c679b47303 is described below

commit c679b4730332ef67102ec7e47db891be2f8feabf
Author: Marcus Eriksson <[email protected]>
AuthorDate: Tue Oct 22 11:27:44 2024 +0200

    Don’t finish ongoing decommission and move operations during startup
    
    Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20040
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/service/StorageService.java   |  38 ++++-
 .../cassandra/service/StorageServiceMBean.java     |   2 +
 .../apache/cassandra/tcm/MultiStepOperation.java   |   5 +
 src/java/org/apache/cassandra/tcm/Startup.java     |  13 +-
 .../tcm/sequences/InProgressSequences.java         |   7 +
 .../org/apache/cassandra/tcm/sequences/Move.java   |  14 +-
 .../tcm/sequences/SingleNodeSequences.java         |  57 ++++++-
 .../tcm/sequences/UnbootstrapAndLeave.java         |   6 +
 src/java/org/apache/cassandra/tools/NodeProbe.java |  10 ++
 .../org/apache/cassandra/tools/nodetool/Move.java  |  29 +++-
 .../cassandra/distributed/impl/Instance.java       |   1 +
 .../distributed/test/DecommissionTest.java         | 165 ++++++++++-----------
 .../distributed/test/FailingMoveTest.java          | 140 +++++++++++++++++
 .../distributed/test/jmx/JMXGetterCheckTest.java   |   2 +
 15 files changed, 391 insertions(+), 99 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 94c5a9835f..f3b9468f62 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Don’t finish ongoing decommission and move operations during startup 
(CASSANDRA-20040)
  * Nodetool reconfigure cms has correct return code when streaming fails 
(CASSANDRA-19972)
  * Reintroduce RestrictionSet#iterator() optimization around multi-column 
restrictions (CASSANDRA-20034)
  * Explicitly localize strings to Locale.US for internal implementation 
(CASSANDRA-19953)
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 3b502f5a48..ab8ddfe9bb 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -253,6 +253,8 @@ import static 
org.apache.cassandra.service.ActiveRepairService.repairCommandExec
 import static org.apache.cassandra.service.StorageService.Mode.DECOMMISSIONED;
 import static 
org.apache.cassandra.service.StorageService.Mode.DECOMMISSION_FAILED;
 import static org.apache.cassandra.service.StorageService.Mode.JOINING_FAILED;
+import static org.apache.cassandra.service.StorageService.Mode.LEAVING;
+import static org.apache.cassandra.service.StorageService.Mode.MOVE_FAILED;
 import static org.apache.cassandra.service.StorageService.Mode.NORMAL;
 import static org.apache.cassandra.tcm.membership.NodeState.BOOTSTRAPPING;
 import static org.apache.cassandra.tcm.membership.NodeState.BOOT_REPLACING;
@@ -454,8 +456,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
     /* the probability for tracing any particular request, 0 disables tracing 
and 1 enables for all */
     private double traceProbability = 0.0;
 
-    public enum Mode { STARTING, NORMAL, JOINING, JOINING_FAILED, LEAVING, 
DECOMMISSIONED, DECOMMISSION_FAILED, MOVING, DRAINING, DRAINED }
-    private volatile Mode operationMode = Mode.STARTING;
+    public enum Mode { STARTING, NORMAL, JOINING, JOINING_FAILED, LEAVING, 
DECOMMISSIONED, DECOMMISSION_FAILED, MOVING, MOVE_FAILED, DRAINING, DRAINED }
 
     /* Can currently hold DECOMMISSIONED, DECOMMISSION_FAILED, DRAINING, 
DRAINED for legacy compatibility. */
     private volatile Optional<Mode> transientMode = Optional.empty();
@@ -763,7 +764,16 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         });
 
         if (SystemKeyspace.wasDecommissioned())
-            throw new ConfigurationException("This node was decommissioned and 
will not rejoin the ring unless cassandra.override_decommission=true has been 
set, or all existing data is removed and the node is bootstrapped again");
+        {
+            if (CassandraRelevantProperties.OVERRIDE_DECOMMISSION.getBoolean())
+            {
+                logger.warn("This node was decommissioned, but overriding by 
operator request.");
+            }
+            else
+            {
+                throw new ConfigurationException("This node was decommissioned 
and will not rejoin the ring unless cassandra.override_decommission=true has 
been set, or all existing data is removed and the node is bootstrapped again");
+            }
+        }
 
         if (DatabaseDescriptor.getReplaceTokens().size() > 0 || 
DatabaseDescriptor.getReplaceNode() != null)
             throw new RuntimeException("Replace method removed; use 
cassandra.replace_address instead");
@@ -3669,6 +3679,18 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         SingleNodeSequences.move(getTokenFactory().fromString(newToken));
     }
 
+    @Override
+    public void resumeMove()
+    {
+        SingleNodeSequences.resumeMove();
+    }
+
+    @Override
+    public void abortMove()
+    {
+        SingleNodeSequences.abortMove();
+    }
+
     public String getRemovalStatus()
     {
         return getRemovalStatus(false);
@@ -3776,6 +3798,12 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         transientMode = Optional.of(JOINING_FAILED);
     }
 
+    public void markMoveFailed()
+    {
+        logger.info(MOVE_FAILED.toString());
+        transientMode = Optional.of(MOVE_FAILED);
+    }
+
     /*
     - Use system_views.local to get information about the node (todo: we might 
still need a jmx endpoint for that since you can't run cql queries on drained 
etc nodes)
      */
@@ -3845,7 +3873,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public boolean isDecommissioned()
     {
-        return operationMode == DECOMMISSIONED;
+        return operationMode() == DECOMMISSIONED;
     }
 
     public boolean isDecommissionFailed()
@@ -3855,7 +3883,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
     public boolean isDecommissioning()
     {
-        return operationMode == Mode.LEAVING || operationMode == 
DECOMMISSION_FAILED;
+        return operationMode() == LEAVING;
     }
 
     public boolean isBootstrapFailed()
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 8beeb32ba5..f23b3b4543 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -530,6 +530,8 @@ public interface StorageServiceMBean extends 
NotificationEmitter
      * This node will unload its data onto its neighbors, and bootstrap to the 
new token.
      */
     public void move(String newToken) throws IOException;
+    public void resumeMove();
+    public void abortMove();
 
     /**
      * removeToken removes token (and all data associated with
diff --git a/src/java/org/apache/cassandra/tcm/MultiStepOperation.java 
b/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
index 5fffbd2c84..019086dccd 100644
--- a/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
+++ b/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
@@ -98,6 +98,11 @@ public abstract class MultiStepOperation<CONTEXT>
         this.latestModification = latestModification;
     }
 
+    public boolean finishDuringStartup()
+    {
+        return true;
+    }
+
     /**
      * Unique identifier for the type of operation, e.g. JOIN, LEAVE, MOVE
      * @return the specific kind of this operation
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java 
b/src/java/org/apache/cassandra/tcm/Startup.java
index 459566ff80..2b60c1561d 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -396,7 +396,7 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
         NodeId self = metadata.myNodeId();
 
         // finish in-progress sequences first
-        InProgressSequences.finishInProgressSequences(self);
+        InProgressSequences.finishInProgressSequences(self, true);
         metadata = ClusterMetadata.current();
 
         switch (metadata.directory.peerState(self))
@@ -407,8 +407,7 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
                     ReconfigureCMS.maybeReconfigureCMS(metadata, 
DatabaseDescriptor.getReplaceAddress());
 
                 
ClusterMetadataService.instance().commit(initialTransformation.get());
-
-                InProgressSequences.finishInProgressSequences(self);
+                InProgressSequences.finishInProgressSequences(self, true); // 
potentially finish the MSO committed above
                 metadata = ClusterMetadata.current();
 
                 if (metadata.directory.peerState(self) == JOINED)
@@ -437,6 +436,14 @@ import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
                                                     "Can't proceed from the 
state " + metadata.directory.peerState(self));
                 }
                 break;
+            case LEAVING:
+                logger.info("Node is currently being decommissioned, resume 
with `nodetool decommission`");
+                StorageService.instance.markDecommissionFailed();
+                break;
+            case MOVING:
+                logger.info("Node is currently moving, resume with nodetool 
move --resume or abort with nodetool move --abort");
+                StorageService.instance.markMoveFailed();
+                break;
             default:
                 throw new IllegalStateException("Can't proceed from the state 
" + metadata.directory.peerState(self));
         }
diff --git 
a/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java 
b/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java
index 86a8ec019a..735a7f6935 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java
@@ -60,6 +60,11 @@ public class InProgressSequences implements 
MetadataValue<InProgressSequences>,
     }
 
     public static void 
finishInProgressSequences(MultiStepOperation.SequenceKey sequenceKey)
+    {
+        finishInProgressSequences(sequenceKey, false);
+    }
+
+    public static void 
finishInProgressSequences(MultiStepOperation.SequenceKey sequenceKey, boolean 
onlyStartupSafeSequences)
     {
         ClusterMetadata metadata = ClusterMetadata.current();
         while (true)
@@ -67,6 +72,8 @@ public class InProgressSequences implements 
MetadataValue<InProgressSequences>,
             MultiStepOperation<?> sequence = 
metadata.inProgressSequences.get(sequenceKey);
             if (sequence == null)
                 break;
+            if (onlyStartupSafeSequences && !sequence.finishDuringStartup())
+                break;
             if (isLeave(sequence))
                 StorageService.instance.maybeInitializeServices();
             if (resume(sequence))
diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java 
b/src/java/org/apache/cassandra/tcm/sequences/Move.java
index 7375aedc78..09811fba80 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/Move.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java
@@ -151,6 +151,12 @@ public class Move extends MultiStepOperation<Epoch>
         this.streamData = current.streamData;
     }
 
+    @Override
+    public boolean finishDuringStartup()
+    {
+        return false;
+    }
+
     @Override
     public Kind kind()
     {
@@ -199,7 +205,7 @@ public class Move extends MultiStepOperation<Epoch>
                 catch (Throwable t)
                 {
                     JVMStabilityInspector.inspectThrowable(t);
-                    return continuable() ;
+                    return continuable();
                 }
                 break;
             case MID_MOVE:
@@ -251,8 +257,14 @@ public class Move extends MultiStepOperation<Epoch>
                 }
                 catch (ExecutionException e)
                 {
+                    StorageService.instance.markMoveFailed();
                     throw new RuntimeException("Unable to move", e);
                 }
+                catch (Exception e)
+                {
+                    StorageService.instance.markMoveFailed();
+                    throw e;
+                }
 
                 try
                 {
diff --git 
a/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java 
b/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java
index 69c071120f..7813fb1492 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.MultiStepOperation;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeState;
+import org.apache.cassandra.tcm.transformations.CancelInProgressSequence;
 import org.apache.cassandra.tcm.transformations.PrepareLeave;
 import org.apache.cassandra.tcm.transformations.PrepareMove;
 
@@ -75,13 +76,17 @@ public interface SingleNodeSequences
 
         if (inProgress == null)
         {
-            logger.info("starting decom with {} {}", metadata.epoch, self);
+            logger.info("starting decommission with {} {}", metadata.epoch, 
self);
             ClusterMetadataService.instance().commit(new PrepareLeave(self,
                                                                       force,
                                                                       
ClusterMetadataService.instance().placementProvider(),
                                                                       
LeaveStreams.Kind.UNBOOTSTRAP));
         }
-        else if (!InProgressSequences.isLeave(inProgress))
+        else if (InProgressSequences.isLeave(inProgress))
+        {
+            logger.info("Resuming decommission @ {} (current epoch = {}): {}", 
inProgress.latestModification, metadata.epoch, inProgress.status());
+        }
+        else
         {
             throw new IllegalArgumentException("Can not decommission a node 
that has an in-progress sequence");
         }
@@ -165,4 +170,52 @@ public interface SingleNodeSequences
             logger.debug("Successfully moved to new token {}", 
StorageService.instance.getLocalTokens().iterator().next());
     }
 
+    static void resumeMove()
+    {
+        if (ClusterMetadataService.instance().isMigrating() || 
ClusterMetadataService.state() == ClusterMetadataService.State.GOSSIP)
+            throw new IllegalStateException("This cluster is migrating to 
cluster metadata, can't move until that is done.");
+
+        ClusterMetadata metadata = ClusterMetadata.current();
+        NodeId self = metadata.myNodeId();
+        MultiStepOperation<?> sequence = 
metadata.inProgressSequences.get(self);
+        if (sequence == null || sequence.kind() != 
MultiStepOperation.Kind.MOVE)
+        {
+            String msg = "No move operation in progress, can't resume";
+            logger.info(msg);
+            throw new IllegalStateException(msg);
+        }
+        if (StorageService.instance.operationMode() != 
StorageService.Mode.MOVE_FAILED)
+        {
+            String msg = "Can't resume a move operation unless it has failed";
+            logger.info(msg);
+            throw new IllegalStateException(msg);
+        }
+        StorageService.instance.clearTransientMode();
+        InProgressSequences.finishInProgressSequences(self);
+    }
+
+    static void abortMove()
+    {
+        if (ClusterMetadataService.instance().isMigrating() || 
ClusterMetadataService.state() == ClusterMetadataService.State.GOSSIP)
+            throw new IllegalStateException("This cluster is migrating to 
cluster metadata, can't move until that is done.");
+
+        ClusterMetadata metadata = ClusterMetadata.current();
+        NodeId self = metadata.myNodeId();
+        MultiStepOperation<?> sequence = 
metadata.inProgressSequences.get(self);
+        if (sequence == null || sequence.kind() != 
MultiStepOperation.Kind.MOVE)
+        {
+            String msg = "No move operation in progress, can't abort";
+            logger.info(msg);
+            throw new IllegalStateException(msg);
+        }
+        if (StorageService.instance.operationMode() != 
StorageService.Mode.MOVE_FAILED)
+        {
+            String msg = "Can't abort a move operation unless it has failed";
+            logger.info(msg);
+            throw new IllegalStateException(msg);
+        }
+        StorageService.instance.clearTransientMode();
+        ClusterMetadataService.instance().commit(new 
CancelInProgressSequence(self));
+    }
+
 }
diff --git 
a/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java 
b/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java
index bd93d7b84c..9e1d1ea62c 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java
@@ -292,6 +292,12 @@ public class UnbootstrapAndLeave extends 
MultiStepOperation<Epoch>
         }
     }
 
+    @Override
+    public boolean finishDuringStartup()
+    {
+        return false;
+    }
+
     @Override
     public String toString()
     {
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 72ed9d3aec..5c75985bda 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1010,6 +1010,16 @@ public class NodeProbe implements AutoCloseable
         ssProxy.move(newToken);
     }
 
+    public void resumeMove()
+    {
+        ssProxy.resumeMove();
+    }
+
+    public void abortMove()
+    {
+        ssProxy.abortMove();
+    }
+
     public void removeNode(String token)
     {
         removeNode(token, false);
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Move.java 
b/src/java/org/apache/cassandra/tools/nodetool/Move.java
index 075e008503..87c085b86b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Move.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Move.java
@@ -23,21 +23,46 @@ import io.airlift.airline.Command;
 
 import java.io.IOException;
 
+import io.airlift.airline.Option;
 import org.apache.cassandra.tools.NodeProbe;
 import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
 
 @Command(name = "move", description = "Move node on the token ring to a new 
token")
 public class Move extends NodeToolCmd
 {
-    @Arguments(usage = "<new token>", description = "The new token.", required 
= true)
+    @Arguments(usage = "<new token>", description = "The new token.")
     private String newToken = EMPTY;
 
+    @Option(title = "Resume an ongoing move operation", name = "--resume")
+    private boolean resume;
+
+    @Option(title = "Abort an ongoing move operation", name = "--abort")
+    private boolean abort;
+
     @Override
     public void execute(NodeProbe probe)
     {
         try
         {
-            probe.move(newToken);
+            if (!newToken.isEmpty())
+            {
+                if (resume || abort)
+                    throw new IllegalArgumentException("Can't give both a 
token and --resume/--abort");
+
+                probe.move(newToken);
+            }
+            else
+            {
+                if (abort && resume)
+                    throw new IllegalArgumentException("Can't both resume and 
abort");
+
+                if (resume)
+                    probe.resumeMove();
+                else if (abort)
+                    probe.abortMove();
+                else
+                    throw new IllegalArgumentException("Need to give either a 
token for a new move operation, or --resume/--abort for an existing one");
+            }
         } catch (IOException e)
         {
             throw new RuntimeException("Error during moving node", e);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 296f95b9b4..fe4a25be43 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -636,6 +636,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
             }
             catch (Throwable t)
             {
+                startedAt.set(0);
                 if (t instanceof RuntimeException)
                     throw (RuntimeException) t;
                 throw new RuntimeException(t);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java
index e2360aca2b..ddee1fef63 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.distributed.test;
 
+import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 
@@ -27,9 +28,13 @@ import net.bytebuddy.ByteBuddy;
 import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
 import net.bytebuddy.implementation.MethodDelegation;
 import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.action.GossipHelper;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.ownership.PlacementDeltas;
 import org.apache.cassandra.tcm.sequences.UnbootstrapStreams;
@@ -41,7 +46,6 @@ import static 
org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 import static 
org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
 import static 
org.apache.cassandra.service.StorageService.Mode.DECOMMISSION_FAILED;
-import static org.apache.cassandra.service.StorageService.Mode.NORMAL;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -59,62 +63,31 @@ public class DecommissionTest extends TestBaseImpl
                                            .start()))
         {
             IInvokableInstance instance = cluster.get(2);
-
+            assertBootstrapState(instance, COMPLETED);
+            instance.nodetoolResult("decommission", "--force")
+                    .asserts()
+                    .failure()
+                    .stderrContains("simulated error in 
prepareUnbootstrapStreaming");
+            instance.runOnInstance(() -> {
+                                       
assertFalse(StorageService.instance.isDecommissioning());
+                                       
assertTrue(StorageService.instance.isDecommissionFailed());
+                                   });
+
+            // still COMPLETED, nothing has changed
+            assertBootstrapState(instance, COMPLETED);
+            assertOperationMode(instance, DECOMMISSION_FAILED);
+            instance.nodetoolResult("decommission", 
"--force").asserts().success();
+            instance.runOnInstance(() -> {
+                                       
assertFalse(StorageService.instance.isDecommissionFailed());
+                                       
assertFalse(StorageService.instance.isDecommissioning());
+                                   });
+            assertBootstrapState(instance, DECOMMISSIONED);
+            instance.nodetoolResult("decommission", "--force")
+                    .asserts()
+                    .success()
+                    .stdoutContains("Node was already decommissioned");
+            assertBootstrapState(instance, DECOMMISSIONED);
             instance.runOnInstance(() -> {
-
-                assertEquals(COMPLETED.name(), 
StorageService.instance.getBootstrapState());
-
-                // pretend that decommissioning has failed in the middle
-
-                try
-                {
-                    StorageService.instance.decommission(true);
-                    fail("the first attempt to decommission should fail");
-                }
-                catch (Throwable t)
-                {
-                    assertTrue(t.getMessage().contains("simulated error in 
prepareUnbootstrapStreaming"));
-                }
-
-                assertFalse(StorageService.instance.isDecommissioning());
-                assertTrue(StorageService.instance.isDecommissionFailed());
-
-                // still COMPLETED, nothing has changed
-                assertEquals(COMPLETED.name(), 
StorageService.instance.getBootstrapState());
-
-                String operationMode = 
StorageService.instance.getOperationMode();
-                assertEquals(DECOMMISSION_FAILED.name(), operationMode);
-
-                // try to decommission again, now successfully
-
-                try
-                {
-                    StorageService.instance.decommission(true);
-
-                    // decommission was successful, so we reset failed 
decommission mode
-                    
assertFalse(StorageService.instance.isDecommissionFailed());
-
-                    assertEquals(DECOMMISSIONED.name(), 
StorageService.instance.getBootstrapState());
-                    assertFalse(StorageService.instance.isDecommissioning());
-                }
-                catch (Throwable t)
-                {
-                    fail("the second decommission attempt should pass but it 
failed on: " + t.getMessage());
-                }
-
-                assertEquals(DECOMMISSIONED.name(), 
StorageService.instance.getBootstrapState());
-                assertFalse(StorageService.instance.isDecommissionFailed());
-
-                try
-                {
-                    StorageService.instance.decommission(true);
-                    fail("Should have failed since the node is in 
decomissioned state");
-                }
-                catch (UnsupportedOperationException e)
-                {
-                    // ignore
-                }
-                assertEquals(DECOMMISSIONED.name(), 
StorageService.instance.getBootstrapState());
                 assertFalse(StorageService.instance.isDecommissionFailed());
                 assertFalse(StorageService.instance.isDecommissioning());
             });
@@ -137,46 +110,27 @@ public class DecommissionTest extends TestBaseImpl
                                            .start()))
         {
             IInvokableInstance instance = cluster.get(2);
-
-            instance.runOnInstance(() -> {
-                assertEquals(COMPLETED.name(), 
StorageService.instance.getBootstrapState());
-
-                // pretend that decommissioning has failed in the middle
-
-                try
-                {
-                    StorageService.instance.decommission(true);
-                    fail("the first attempt to decommission should fail");
-                }
-                catch (Throwable t)
-                {
-                    assertTrue(t.getMessage().contains("simulated error in 
prepareUnbootstrapStreaming"));
-                }
-
-                // node is in DECOMMISSION_FAILED mode
-                String operationMode = 
StorageService.instance.getOperationMode();
-                assertEquals(DECOMMISSION_FAILED.name(), operationMode);
-            });
-
+            assertBootstrapState(instance, COMPLETED);
+            // pretend that decommissioning has failed in the middle
+            instance.nodetoolResult("decommission", "--force")
+                    .asserts()
+                    .failure()
+                    .stderrContains("simulated error in 
prepareUnbootstrapStreaming");
+            assertOperationMode(instance, DECOMMISSION_FAILED);
             // restart the node which we failed to decommission
             stopUnchecked(instance);
             instance.startup();
-
-            // it is back to normal so let's decommission again
-
-            String oprationMode = instance.callOnInstance(() -> 
StorageService.instance.getOperationMode());
-            assertEquals(NORMAL.name(), oprationMode);
-
+            // it starts up as DECOMMISSION_FAILED so let's decommission again
+            assertOperationMode(instance, DECOMMISSION_FAILED);
+            instance.nodetoolResult("decommission", 
"--force").asserts().success();
+            assertBootstrapState(instance, DECOMMISSIONED);
             instance.runOnInstance(() -> {
-                StorageService.instance.decommission(true);
-                assertEquals(DECOMMISSIONED.name(), 
StorageService.instance.getBootstrapState());
                 assertFalse(StorageService.instance.isDecommissionFailed());
                 assertFalse(StorageService.instance.isDecommissioning());
             });
         }
     }
 
-
     public static class BB
     {
         public static void install(ClassLoader classLoader, Integer num)
@@ -208,4 +162,43 @@ public class DecommissionTest extends TestBaseImpl
             }
         }
     }
+
+    @Test
+    public void testRestartDecommedNode() throws IOException, 
ExecutionException, InterruptedException
+    {
+        try (Cluster cluster = init(Cluster.build(2)
+                                           .withConfig(config -> 
config.with(GOSSIP)
+                                                                       
.with(NETWORK))
+                                           .start()))
+        {
+            cluster.get(2).nodetoolResult("decommission", 
"--force").asserts().success();
+            cluster.get(2).shutdown().get();
+            try
+            {
+                cluster.get(2).startup();
+                fail();
+            }
+            catch (Exception e)
+            {
+                cluster.get(2).runOnInstance(() -> 
ClusterMetadataService.unsetInstance());
+                assertTrue(e.getMessage().contains("This node was 
decommissioned and will not rejoin the ring unless 
cassandra.override_decommission=true"));
+            }
+
+            
GossipHelper.withProperty(CassandraRelevantProperties.OVERRIDE_DECOMMISSION, 
true, () -> cluster.get(2).startup());
+            assertBootstrapState(cluster.get(2), COMPLETED);
+        }
+    }
+
+    private static void assertBootstrapState(IInvokableInstance i, 
SystemKeyspace.BootstrapState expectedState)
+    {
+        String bootstrapState = expectedState.name();
+        i.runOnInstance(() -> assertEquals(bootstrapState, 
SystemKeyspace.getBootstrapState().name()));
+    }
+
+    private static void assertOperationMode(IInvokableInstance i, 
StorageService.Mode mode)
+    {
+        String operationMode = mode.name();
+        i.runOnInstance(() -> assertEquals(operationMode, 
StorageService.instance.operationMode().name()));
+    }
+
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/FailingMoveTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/FailingMoveTest.java
new file mode 100644
index 0000000000..ac60f90cd3
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/FailingMoveTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class FailingMoveTest extends TestBaseImpl
+{
+    @Test
+    public void testResumeMove() throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(3)
+                                           .withoutVNodes()
+                                           .withConfig(c -> 
c.with(Feature.GOSSIP, Feature.NETWORK))
+                                           
.withInstanceInitializer(BB::install)
+                                           .start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl(id int 
primary key);"));
+            for (int i=0; i<30; i++)
+                cluster.coordinator(1).execute(withKeyspace("INSERT INTO 
%s.tbl (id) VALUES (?)"),
+                                               ConsistencyLevel.ALL, i);
+            String oldToken = getToken(cluster.get(3));
+            String moveToToken = "2305843009213693949";
+            assertNotEquals(oldToken, moveToToken);
+            cluster.get(3).nodetoolResult("move", 
moveToToken).asserts().failure();
+            cluster.get(3).runOnInstance(() -> {
+                assertEquals(StorageService.Mode.MOVE_FAILED, 
StorageService.instance.operationMode());
+                BB.shouldFail.set(false);
+            });
+
+            cluster.get(3).nodetoolResult("move", 
"--resume").asserts().success();
+            cluster.get(3).runOnInstance(() -> 
assertEquals(StorageService.Mode.NORMAL, 
StorageService.instance.operationMode()));
+            assertEquals(moveToToken, getToken(cluster.get(3)));
+        }
+    }
+
+    @Test
+    public void testAbortMove() throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(3)
+                                           .withoutVNodes()
+                                           .withConfig(c -> 
c.with(Feature.GOSSIP, Feature.NETWORK))
+                                           
.withInstanceInitializer(BB::install)
+                                           .start()))
+        {
+            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl(id int 
primary key);"));
+            for (int i=0; i<30; i++)
+                cluster.coordinator(1).execute(withKeyspace("INSERT INTO 
%s.tbl (id) VALUES (?)"),
+                                               ConsistencyLevel.ALL, i);
+            String oldToken = getToken(cluster.get(3));
+            String moveToToken = "2305843009213693949";
+            assertNotEquals(oldToken, moveToToken);
+            cluster.get(3).nodetoolResult("move", 
moveToToken).asserts().failure();
+            cluster.get(3).runOnInstance(() -> {
+                assertEquals(StorageService.Mode.MOVE_FAILED, 
StorageService.instance.operationMode());
+                BB.shouldFail.set(false);
+            });
+
+            cluster.get(3).nodetoolResult("move", 
"--abort").asserts().success();
+            cluster.get(3).runOnInstance(() -> 
assertEquals(StorageService.Mode.NORMAL, 
StorageService.instance.operationMode()));
+            assertNotEquals(moveToToken, getToken(cluster.get(3)));
+        }
+    }
+
+    private String getToken(IInvokableInstance instance)
+    {
+        return instance.callsOnInstance(() -> {
+            NodeId self = ClusterMetadata.current().myNodeId();
+            return 
ClusterMetadata.current().tokenMap.tokens(self).iterator().next().toString();
+        }).call();
+    }
+
+    public static class BB
+    {
+        static AtomicBoolean shouldFail = new AtomicBoolean(true);
+        public static void install(ClassLoader classLoader, Integer num)
+        {
+            new ByteBuddy().rebase(StreamPlan.class)
+                           .method(named("execute"))
+                           .intercept(MethodDelegation.to(BB.class))
+                           .make()
+                           .load(classLoader, 
ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        @SuppressWarnings("unused")
+        public static StreamResultFuture execute(@SuperCall 
Callable<StreamResultFuture> zuper)
+        {
+            if (shouldFail.get())
+                throw new RuntimeException("failing stream");
+
+            try
+            {
+                return zuper.call();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
index fa379ffdc1..ea95003093 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
@@ -59,6 +59,8 @@ public class JMXGetterCheckTest extends TestBaseImpl
     "org.apache.cassandra.db:type=StorageService:clearConnectionHistory", // 
Throws a NullPointerException
     "org.apache.cassandra.db:type=StorageService:startGossiping", // causes 
multiple loops to fail
     "org.apache.cassandra.db:type=StorageService:startNativeTransport", // 
causes multiple loops to fail
+    "org.apache.cassandra.db:type=StorageService:resumeMove", // throws since 
there is no move in progress
+    "org.apache.cassandra.db:type=StorageService:abortMove", // throws since 
there is no move in progress
     
"org.apache.cassandra.db:type=CIDRGroupsMappingManager:loadCidrGroupsCache", // 
AllowAllCIDRAuthorizer doesn't support this operation, as feature is disabled 
by default
     "org.apache.cassandra.db:type=StorageService:forceRemoveCompletion" // 
deprecated (TCM)
     );


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to