This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new c679b47303 Don’t finish ongoing decommission and move operations
during startup
c679b47303 is described below
commit c679b4730332ef67102ec7e47db891be2f8feabf
Author: Marcus Eriksson <[email protected]>
AuthorDate: Tue Oct 22 11:27:44 2024 +0200
Don’t finish ongoing decommission and move operations during startup
Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20040
---
CHANGES.txt | 1 +
.../apache/cassandra/service/StorageService.java | 38 ++++-
.../cassandra/service/StorageServiceMBean.java | 2 +
.../apache/cassandra/tcm/MultiStepOperation.java | 5 +
src/java/org/apache/cassandra/tcm/Startup.java | 13 +-
.../tcm/sequences/InProgressSequences.java | 7 +
.../org/apache/cassandra/tcm/sequences/Move.java | 14 +-
.../tcm/sequences/SingleNodeSequences.java | 57 ++++++-
.../tcm/sequences/UnbootstrapAndLeave.java | 6 +
src/java/org/apache/cassandra/tools/NodeProbe.java | 10 ++
.../org/apache/cassandra/tools/nodetool/Move.java | 29 +++-
.../cassandra/distributed/impl/Instance.java | 1 +
.../distributed/test/DecommissionTest.java | 165 ++++++++++-----------
.../distributed/test/FailingMoveTest.java | 140 +++++++++++++++++
.../distributed/test/jmx/JMXGetterCheckTest.java | 2 +
15 files changed, 391 insertions(+), 99 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 94c5a9835f..f3b9468f62 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Don’t finish ongoing decommission and move operations during startup
(CASSANDRA-20040)
* Nodetool reconfigure cms has correct return code when streaming fails
(CASSANDRA-19972)
* Reintroduce RestrictionSet#iterator() optimization around multi-column
restrictions (CASSANDRA-20034)
* Explicitly localize strings to Locale.US for internal implementation
(CASSANDRA-19953)
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index 3b502f5a48..ab8ddfe9bb 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -253,6 +253,8 @@ import static
org.apache.cassandra.service.ActiveRepairService.repairCommandExec
import static org.apache.cassandra.service.StorageService.Mode.DECOMMISSIONED;
import static
org.apache.cassandra.service.StorageService.Mode.DECOMMISSION_FAILED;
import static org.apache.cassandra.service.StorageService.Mode.JOINING_FAILED;
+import static org.apache.cassandra.service.StorageService.Mode.LEAVING;
+import static org.apache.cassandra.service.StorageService.Mode.MOVE_FAILED;
import static org.apache.cassandra.service.StorageService.Mode.NORMAL;
import static org.apache.cassandra.tcm.membership.NodeState.BOOTSTRAPPING;
import static org.apache.cassandra.tcm.membership.NodeState.BOOT_REPLACING;
@@ -454,8 +456,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
/* the probability for tracing any particular request, 0 disables tracing
and 1 enables for all */
private double traceProbability = 0.0;
- public enum Mode { STARTING, NORMAL, JOINING, JOINING_FAILED, LEAVING,
DECOMMISSIONED, DECOMMISSION_FAILED, MOVING, DRAINING, DRAINED }
- private volatile Mode operationMode = Mode.STARTING;
+ public enum Mode { STARTING, NORMAL, JOINING, JOINING_FAILED, LEAVING,
DECOMMISSIONED, DECOMMISSION_FAILED, MOVING, MOVE_FAILED, DRAINING, DRAINED }
/* Can currently hold DECOMMISSIONED, DECOMMISSION_FAILED, DRAINING,
DRAINED for legacy compatibility. */
private volatile Optional<Mode> transientMode = Optional.empty();
@@ -763,7 +764,16 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
});
if (SystemKeyspace.wasDecommissioned())
- throw new ConfigurationException("This node was decommissioned and
will not rejoin the ring unless cassandra.override_decommission=true has been
set, or all existing data is removed and the node is bootstrapped again");
+ {
+ if (CassandraRelevantProperties.OVERRIDE_DECOMMISSION.getBoolean())
+ {
+ logger.warn("This node was decommissioned, but overriding by
operator request.");
+ }
+ else
+ {
+ throw new ConfigurationException("This node was decommissioned
and will not rejoin the ring unless cassandra.override_decommission=true has
been set, or all existing data is removed and the node is bootstrapped again");
+ }
+ }
if (DatabaseDescriptor.getReplaceTokens().size() > 0 ||
DatabaseDescriptor.getReplaceNode() != null)
throw new RuntimeException("Replace method removed; use
cassandra.replace_address instead");
@@ -3669,6 +3679,18 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
SingleNodeSequences.move(getTokenFactory().fromString(newToken));
}
+ @Override
+ public void resumeMove()
+ {
+ SingleNodeSequences.resumeMove();
+ }
+
+ @Override
+ public void abortMove()
+ {
+ SingleNodeSequences.abortMove();
+ }
+
public String getRemovalStatus()
{
return getRemovalStatus(false);
@@ -3776,6 +3798,12 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
transientMode = Optional.of(JOINING_FAILED);
}
+ public void markMoveFailed()
+ {
+ logger.info(MOVE_FAILED.toString());
+ transientMode = Optional.of(MOVE_FAILED);
+ }
+
/*
- Use system_views.local to get information about the node (todo: we might
still need a jmx endpoint for that since you can't run cql queries on drained
etc nodes)
*/
@@ -3845,7 +3873,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
public boolean isDecommissioned()
{
- return operationMode == DECOMMISSIONED;
+ return operationMode() == DECOMMISSIONED;
}
public boolean isDecommissionFailed()
@@ -3855,7 +3883,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
public boolean isDecommissioning()
{
- return operationMode == Mode.LEAVING || operationMode ==
DECOMMISSION_FAILED;
+ return operationMode() == LEAVING;
}
public boolean isBootstrapFailed()
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 8beeb32ba5..f23b3b4543 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -530,6 +530,8 @@ public interface StorageServiceMBean extends
NotificationEmitter
* This node will unload its data onto its neighbors, and bootstrap to the
new token.
*/
public void move(String newToken) throws IOException;
+ public void resumeMove();
+ public void abortMove();
/**
* removeToken removes token (and all data associated with
diff --git a/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
b/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
index 5fffbd2c84..019086dccd 100644
--- a/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
+++ b/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
@@ -98,6 +98,11 @@ public abstract class MultiStepOperation<CONTEXT>
this.latestModification = latestModification;
}
+ public boolean finishDuringStartup()
+ {
+ return true;
+ }
+
/**
* Unique identifier for the type of operation, e.g. JOIN, LEAVE, MOVE
* @return the specific kind of this operation
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java
b/src/java/org/apache/cassandra/tcm/Startup.java
index 459566ff80..2b60c1561d 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -396,7 +396,7 @@ import static
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
NodeId self = metadata.myNodeId();
// finish in-progress sequences first
- InProgressSequences.finishInProgressSequences(self);
+ InProgressSequences.finishInProgressSequences(self, true);
metadata = ClusterMetadata.current();
switch (metadata.directory.peerState(self))
@@ -407,8 +407,7 @@ import static
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
ReconfigureCMS.maybeReconfigureCMS(metadata,
DatabaseDescriptor.getReplaceAddress());
ClusterMetadataService.instance().commit(initialTransformation.get());
-
- InProgressSequences.finishInProgressSequences(self);
+ InProgressSequences.finishInProgressSequences(self, true); //
potentially finish the MSO committed above
metadata = ClusterMetadata.current();
if (metadata.directory.peerState(self) == JOINED)
@@ -437,6 +436,14 @@ import static
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
"Can't proceed from the
state " + metadata.directory.peerState(self));
}
break;
+ case LEAVING:
+ logger.info("Node is currently being decommissioned, resume
with `nodetool decommission`");
+ StorageService.instance.markDecommissionFailed();
+ break;
+ case MOVING:
+ logger.info("Node is currently moving, resume with nodetool
move --resume or abort with nodetool move --abort");
+ StorageService.instance.markMoveFailed();
+ break;
default:
throw new IllegalStateException("Can't proceed from the state
" + metadata.directory.peerState(self));
}
diff --git
a/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java
b/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java
index 86a8ec019a..735a7f6935 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/InProgressSequences.java
@@ -60,6 +60,11 @@ public class InProgressSequences implements
MetadataValue<InProgressSequences>,
}
public static void
finishInProgressSequences(MultiStepOperation.SequenceKey sequenceKey)
+ {
+ finishInProgressSequences(sequenceKey, false);
+ }
+
+ public static void
finishInProgressSequences(MultiStepOperation.SequenceKey sequenceKey, boolean
onlyStartupSafeSequences)
{
ClusterMetadata metadata = ClusterMetadata.current();
while (true)
@@ -67,6 +72,8 @@ public class InProgressSequences implements
MetadataValue<InProgressSequences>,
MultiStepOperation<?> sequence =
metadata.inProgressSequences.get(sequenceKey);
if (sequence == null)
break;
+ if (onlyStartupSafeSequences && !sequence.finishDuringStartup())
+ break;
if (isLeave(sequence))
StorageService.instance.maybeInitializeServices();
if (resume(sequence))
diff --git a/src/java/org/apache/cassandra/tcm/sequences/Move.java
b/src/java/org/apache/cassandra/tcm/sequences/Move.java
index 7375aedc78..09811fba80 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/Move.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/Move.java
@@ -151,6 +151,12 @@ public class Move extends MultiStepOperation<Epoch>
this.streamData = current.streamData;
}
+ @Override
+ public boolean finishDuringStartup()
+ {
+ return false;
+ }
+
@Override
public Kind kind()
{
@@ -199,7 +205,7 @@ public class Move extends MultiStepOperation<Epoch>
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
- return continuable() ;
+ return continuable();
}
break;
case MID_MOVE:
@@ -251,8 +257,14 @@ public class Move extends MultiStepOperation<Epoch>
}
catch (ExecutionException e)
{
+ StorageService.instance.markMoveFailed();
throw new RuntimeException("Unable to move", e);
}
+ catch (Exception e)
+ {
+ StorageService.instance.markMoveFailed();
+ throw e;
+ }
try
{
diff --git
a/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java
b/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java
index 69c071120f..7813fb1492 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/SingleNodeSequences.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.MultiStepOperation;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeState;
+import org.apache.cassandra.tcm.transformations.CancelInProgressSequence;
import org.apache.cassandra.tcm.transformations.PrepareLeave;
import org.apache.cassandra.tcm.transformations.PrepareMove;
@@ -75,13 +76,17 @@ public interface SingleNodeSequences
if (inProgress == null)
{
- logger.info("starting decom with {} {}", metadata.epoch, self);
+ logger.info("starting decommission with {} {}", metadata.epoch,
self);
ClusterMetadataService.instance().commit(new PrepareLeave(self,
force,
ClusterMetadataService.instance().placementProvider(),
LeaveStreams.Kind.UNBOOTSTRAP));
}
- else if (!InProgressSequences.isLeave(inProgress))
+ else if (InProgressSequences.isLeave(inProgress))
+ {
+ logger.info("Resuming decommission @ {} (current epoch = {}): {}",
inProgress.latestModification, metadata.epoch, inProgress.status());
+ }
+ else
{
throw new IllegalArgumentException("Can not decommission a node
that has an in-progress sequence");
}
@@ -165,4 +170,52 @@ public interface SingleNodeSequences
logger.debug("Successfully moved to new token {}",
StorageService.instance.getLocalTokens().iterator().next());
}
+ static void resumeMove()
+ {
+ if (ClusterMetadataService.instance().isMigrating() ||
ClusterMetadataService.state() == ClusterMetadataService.State.GOSSIP)
+ throw new IllegalStateException("This cluster is migrating to
cluster metadata, can't move until that is done.");
+
+ ClusterMetadata metadata = ClusterMetadata.current();
+ NodeId self = metadata.myNodeId();
+ MultiStepOperation<?> sequence =
metadata.inProgressSequences.get(self);
+ if (sequence == null || sequence.kind() !=
MultiStepOperation.Kind.MOVE)
+ {
+ String msg = "No move operation in progress, can't resume";
+ logger.info(msg);
+ throw new IllegalStateException(msg);
+ }
+ if (StorageService.instance.operationMode() !=
StorageService.Mode.MOVE_FAILED)
+ {
+ String msg = "Can't resume a move operation unless it has failed";
+ logger.info(msg);
+ throw new IllegalStateException(msg);
+ }
+ StorageService.instance.clearTransientMode();
+ InProgressSequences.finishInProgressSequences(self);
+ }
+
+ static void abortMove()
+ {
+ if (ClusterMetadataService.instance().isMigrating() ||
ClusterMetadataService.state() == ClusterMetadataService.State.GOSSIP)
+ throw new IllegalStateException("This cluster is migrating to
cluster metadata, can't move until that is done.");
+
+ ClusterMetadata metadata = ClusterMetadata.current();
+ NodeId self = metadata.myNodeId();
+ MultiStepOperation<?> sequence =
metadata.inProgressSequences.get(self);
+ if (sequence == null || sequence.kind() !=
MultiStepOperation.Kind.MOVE)
+ {
+ String msg = "No move operation in progress, can't abort";
+ logger.info(msg);
+ throw new IllegalStateException(msg);
+ }
+ if (StorageService.instance.operationMode() !=
StorageService.Mode.MOVE_FAILED)
+ {
+ String msg = "Can't abort a move operation unless it has failed";
+ logger.info(msg);
+ throw new IllegalStateException(msg);
+ }
+ StorageService.instance.clearTransientMode();
+ ClusterMetadataService.instance().commit(new
CancelInProgressSequence(self));
+ }
+
}
diff --git
a/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java
b/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java
index bd93d7b84c..9e1d1ea62c 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/UnbootstrapAndLeave.java
@@ -292,6 +292,12 @@ public class UnbootstrapAndLeave extends
MultiStepOperation<Epoch>
}
}
+ @Override
+ public boolean finishDuringStartup()
+ {
+ return false;
+ }
+
@Override
public String toString()
{
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 72ed9d3aec..5c75985bda 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -1010,6 +1010,16 @@ public class NodeProbe implements AutoCloseable
ssProxy.move(newToken);
}
+ public void resumeMove()
+ {
+ ssProxy.resumeMove();
+ }
+
+ public void abortMove()
+ {
+ ssProxy.abortMove();
+ }
+
public void removeNode(String token)
{
removeNode(token, false);
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Move.java
b/src/java/org/apache/cassandra/tools/nodetool/Move.java
index 075e008503..87c085b86b 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Move.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Move.java
@@ -23,21 +23,46 @@ import io.airlift.airline.Command;
import java.io.IOException;
+import io.airlift.airline.Option;
import org.apache.cassandra.tools.NodeProbe;
import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
@Command(name = "move", description = "Move node on the token ring to a new
token")
public class Move extends NodeToolCmd
{
- @Arguments(usage = "<new token>", description = "The new token.", required
= true)
+ @Arguments(usage = "<new token>", description = "The new token.")
private String newToken = EMPTY;
+ @Option(title = "Resume an ongoing move operation", name = "--resume")
+ private boolean resume;
+
+ @Option(title = "Abort an ongoing move operation", name = "--abort")
+ private boolean abort;
+
@Override
public void execute(NodeProbe probe)
{
try
{
- probe.move(newToken);
+ if (!newToken.isEmpty())
+ {
+ if (resume || abort)
+ throw new IllegalArgumentException("Can't give both a
token and --resume/--abort");
+
+ probe.move(newToken);
+ }
+ else
+ {
+ if (abort && resume)
+ throw new IllegalArgumentException("Can't both resume and
abort");
+
+ if (resume)
+ probe.resumeMove();
+ else if (abort)
+ probe.abortMove();
+ else
+ throw new IllegalArgumentException("Need to give either a
token for a new move operation, or --resume/--abort for an existing one");
+ }
} catch (IOException e)
{
throw new RuntimeException("Error during moving node", e);
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 296f95b9b4..fe4a25be43 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -636,6 +636,7 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
}
catch (Throwable t)
{
+ startedAt.set(0);
if (t instanceof RuntimeException)
throw (RuntimeException) t;
throw new RuntimeException(t);
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java
b/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java
index e2360aca2b..ddee1fef63 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.distributed.test;
+import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -27,9 +28,13 @@ import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.action.GossipHelper;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.ClusterMetadataService;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.ownership.PlacementDeltas;
import org.apache.cassandra.tcm.sequences.UnbootstrapStreams;
@@ -41,7 +46,6 @@ import static
org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static
org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked;
import static
org.apache.cassandra.service.StorageService.Mode.DECOMMISSION_FAILED;
-import static org.apache.cassandra.service.StorageService.Mode.NORMAL;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -59,62 +63,31 @@ public class DecommissionTest extends TestBaseImpl
.start()))
{
IInvokableInstance instance = cluster.get(2);
-
+ assertBootstrapState(instance, COMPLETED);
+ instance.nodetoolResult("decommission", "--force")
+ .asserts()
+ .failure()
+ .stderrContains("simulated error in
prepareUnbootstrapStreaming");
+ instance.runOnInstance(() -> {
+
assertFalse(StorageService.instance.isDecommissioning());
+
assertTrue(StorageService.instance.isDecommissionFailed());
+ });
+
+ // still COMPLETED, nothing has changed
+ assertBootstrapState(instance, COMPLETED);
+ assertOperationMode(instance, DECOMMISSION_FAILED);
+ instance.nodetoolResult("decommission",
"--force").asserts().success();
+ instance.runOnInstance(() -> {
+
assertFalse(StorageService.instance.isDecommissionFailed());
+
assertFalse(StorageService.instance.isDecommissioning());
+ });
+ assertBootstrapState(instance, DECOMMISSIONED);
+ instance.nodetoolResult("decommission", "--force")
+ .asserts()
+ .success()
+ .stdoutContains("Node was already decommissioned");
+ assertBootstrapState(instance, DECOMMISSIONED);
instance.runOnInstance(() -> {
-
- assertEquals(COMPLETED.name(),
StorageService.instance.getBootstrapState());
-
- // pretend that decommissioning has failed in the middle
-
- try
- {
- StorageService.instance.decommission(true);
- fail("the first attempt to decommission should fail");
- }
- catch (Throwable t)
- {
- assertTrue(t.getMessage().contains("simulated error in
prepareUnbootstrapStreaming"));
- }
-
- assertFalse(StorageService.instance.isDecommissioning());
- assertTrue(StorageService.instance.isDecommissionFailed());
-
- // still COMPLETED, nothing has changed
- assertEquals(COMPLETED.name(),
StorageService.instance.getBootstrapState());
-
- String operationMode =
StorageService.instance.getOperationMode();
- assertEquals(DECOMMISSION_FAILED.name(), operationMode);
-
- // try to decommission again, now successfully
-
- try
- {
- StorageService.instance.decommission(true);
-
- // decommission was successful, so we reset failed
decommission mode
-
assertFalse(StorageService.instance.isDecommissionFailed());
-
- assertEquals(DECOMMISSIONED.name(),
StorageService.instance.getBootstrapState());
- assertFalse(StorageService.instance.isDecommissioning());
- }
- catch (Throwable t)
- {
- fail("the second decommission attempt should pass but it
failed on: " + t.getMessage());
- }
-
- assertEquals(DECOMMISSIONED.name(),
StorageService.instance.getBootstrapState());
- assertFalse(StorageService.instance.isDecommissionFailed());
-
- try
- {
- StorageService.instance.decommission(true);
- fail("Should have failed since the node is in
decomissioned state");
- }
- catch (UnsupportedOperationException e)
- {
- // ignore
- }
- assertEquals(DECOMMISSIONED.name(),
StorageService.instance.getBootstrapState());
assertFalse(StorageService.instance.isDecommissionFailed());
assertFalse(StorageService.instance.isDecommissioning());
});
@@ -137,46 +110,27 @@ public class DecommissionTest extends TestBaseImpl
.start()))
{
IInvokableInstance instance = cluster.get(2);
-
- instance.runOnInstance(() -> {
- assertEquals(COMPLETED.name(),
StorageService.instance.getBootstrapState());
-
- // pretend that decommissioning has failed in the middle
-
- try
- {
- StorageService.instance.decommission(true);
- fail("the first attempt to decommission should fail");
- }
- catch (Throwable t)
- {
- assertTrue(t.getMessage().contains("simulated error in
prepareUnbootstrapStreaming"));
- }
-
- // node is in DECOMMISSION_FAILED mode
- String operationMode =
StorageService.instance.getOperationMode();
- assertEquals(DECOMMISSION_FAILED.name(), operationMode);
- });
-
+ assertBootstrapState(instance, COMPLETED);
+ // pretend that decommissioning has failed in the middle
+ instance.nodetoolResult("decommission", "--force")
+ .asserts()
+ .failure()
+ .stderrContains("simulated error in
prepareUnbootstrapStreaming");
+ assertOperationMode(instance, DECOMMISSION_FAILED);
// restart the node which we failed to decommission
stopUnchecked(instance);
instance.startup();
-
- // it is back to normal so let's decommission again
-
- String oprationMode = instance.callOnInstance(() ->
StorageService.instance.getOperationMode());
- assertEquals(NORMAL.name(), oprationMode);
-
+ // it starts up as DECOMMISSION_FAILED so let's decommission again
+ assertOperationMode(instance, DECOMMISSION_FAILED);
+ instance.nodetoolResult("decommission",
"--force").asserts().success();
+ assertBootstrapState(instance, DECOMMISSIONED);
instance.runOnInstance(() -> {
- StorageService.instance.decommission(true);
- assertEquals(DECOMMISSIONED.name(),
StorageService.instance.getBootstrapState());
assertFalse(StorageService.instance.isDecommissionFailed());
assertFalse(StorageService.instance.isDecommissioning());
});
}
}
-
public static class BB
{
public static void install(ClassLoader classLoader, Integer num)
@@ -208,4 +162,43 @@ public class DecommissionTest extends TestBaseImpl
}
}
}
+
+ @Test
+ public void testRestartDecommedNode() throws IOException,
ExecutionException, InterruptedException
+ {
+ try (Cluster cluster = init(Cluster.build(2)
+ .withConfig(config ->
config.with(GOSSIP)
+
.with(NETWORK))
+ .start()))
+ {
+ cluster.get(2).nodetoolResult("decommission",
"--force").asserts().success();
+ cluster.get(2).shutdown().get();
+ try
+ {
+ cluster.get(2).startup();
+ fail();
+ }
+ catch (Exception e)
+ {
+ cluster.get(2).runOnInstance(() ->
ClusterMetadataService.unsetInstance());
+ assertTrue(e.getMessage().contains("This node was
decommissioned and will not rejoin the ring unless
cassandra.override_decommission=true"));
+ }
+
+
GossipHelper.withProperty(CassandraRelevantProperties.OVERRIDE_DECOMMISSION,
true, () -> cluster.get(2).startup());
+ assertBootstrapState(cluster.get(2), COMPLETED);
+ }
+ }
+
+ private static void assertBootstrapState(IInvokableInstance i,
SystemKeyspace.BootstrapState expectedState)
+ {
+ String bootstrapState = expectedState.name();
+ i.runOnInstance(() -> assertEquals(bootstrapState,
SystemKeyspace.getBootstrapState().name()));
+ }
+
+ private static void assertOperationMode(IInvokableInstance i,
StorageService.Mode mode)
+ {
+ String operationMode = mode.name();
+ i.runOnInstance(() -> assertEquals(operationMode,
StorageService.instance.operationMode().name()));
+ }
+
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/FailingMoveTest.java
b/test/distributed/org/apache/cassandra/distributed/test/FailingMoveTest.java
new file mode 100644
index 0000000000..ac60f90cd3
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/FailingMoveTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.StreamPlan;
+import org.apache.cassandra.streaming.StreamResultFuture;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class FailingMoveTest extends TestBaseImpl
+{
+ @Test
+ public void testResumeMove() throws IOException
+ {
+ try (Cluster cluster = init(Cluster.build(3)
+ .withoutVNodes()
+ .withConfig(c ->
c.with(Feature.GOSSIP, Feature.NETWORK))
+
.withInstanceInitializer(BB::install)
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl(id int
primary key);"));
+ for (int i=0; i<30; i++)
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO
%s.tbl (id) VALUES (?)"),
+ ConsistencyLevel.ALL, i);
+ String oldToken = getToken(cluster.get(3));
+ String moveToToken = "2305843009213693949";
+ assertNotEquals(oldToken, moveToToken);
+ cluster.get(3).nodetoolResult("move",
moveToToken).asserts().failure();
+ cluster.get(3).runOnInstance(() -> {
+ assertEquals(StorageService.Mode.MOVE_FAILED,
StorageService.instance.operationMode());
+ BB.shouldFail.set(false);
+ });
+
+ cluster.get(3).nodetoolResult("move",
"--resume").asserts().success();
+ cluster.get(3).runOnInstance(() ->
assertEquals(StorageService.Mode.NORMAL,
StorageService.instance.operationMode()));
+ assertEquals(moveToToken, getToken(cluster.get(3)));
+ }
+ }
+
+ @Test
+ public void testAbortMove() throws IOException
+ {
+ try (Cluster cluster = init(Cluster.build(3)
+ .withoutVNodes()
+ .withConfig(c ->
c.with(Feature.GOSSIP, Feature.NETWORK))
+
.withInstanceInitializer(BB::install)
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl(id int
primary key);"));
+ for (int i=0; i<30; i++)
+ cluster.coordinator(1).execute(withKeyspace("INSERT INTO
%s.tbl (id) VALUES (?)"),
+ ConsistencyLevel.ALL, i);
+ String oldToken = getToken(cluster.get(3));
+ String moveToToken = "2305843009213693949";
+ assertNotEquals(oldToken, moveToToken);
+ cluster.get(3).nodetoolResult("move",
moveToToken).asserts().failure();
+ cluster.get(3).runOnInstance(() -> {
+ assertEquals(StorageService.Mode.MOVE_FAILED,
StorageService.instance.operationMode());
+ BB.shouldFail.set(false);
+ });
+
+ cluster.get(3).nodetoolResult("move",
"--abort").asserts().success();
+ cluster.get(3).runOnInstance(() ->
assertEquals(StorageService.Mode.NORMAL,
StorageService.instance.operationMode()));
+ assertNotEquals(moveToToken, getToken(cluster.get(3)));
+ }
+ }
+
+ private String getToken(IInvokableInstance instance)
+ {
+ return instance.callsOnInstance(() -> {
+ NodeId self = ClusterMetadata.current().myNodeId();
+ return
ClusterMetadata.current().tokenMap.tokens(self).iterator().next().toString();
+ }).call();
+ }
+
+ public static class BB
+ {
+ static AtomicBoolean shouldFail = new AtomicBoolean(true);
+ public static void install(ClassLoader classLoader, Integer num)
+ {
+ new ByteBuddy().rebase(StreamPlan.class)
+ .method(named("execute"))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(classLoader,
ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ @SuppressWarnings("unused")
+ public static StreamResultFuture execute(@SuperCall
Callable<StreamResultFuture> zuper)
+ {
+ if (shouldFail.get())
+ throw new RuntimeException("failing stream");
+
+ try
+ {
+ return zuper.call();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
index fa379ffdc1..ea95003093 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java
@@ -59,6 +59,8 @@ public class JMXGetterCheckTest extends TestBaseImpl
"org.apache.cassandra.db:type=StorageService:clearConnectionHistory", //
Throws a NullPointerException
"org.apache.cassandra.db:type=StorageService:startGossiping", // causes
multiple loops to fail
"org.apache.cassandra.db:type=StorageService:startNativeTransport", //
causes multiple loops to fail
+ "org.apache.cassandra.db:type=StorageService:resumeMove", // throws since
there is no move in progress
+ "org.apache.cassandra.db:type=StorageService:abortMove", // throws since
there is no move in progress
"org.apache.cassandra.db:type=CIDRGroupsMappingManager:loadCidrGroupsCache", //
AllowAllCIDRAuthorizer doesn't support this operation, as feature is disabled
by default
"org.apache.cassandra.db:type=StorageService:forceRemoveCompletion" //
deprecated (TCM)
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]