This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new a7695a6220 Forbid nodes upgrading to a version which cannot read
existing log entries
a7695a6220 is described below
commit a7695a62200c3085fea634b9fdc8d393a057458e
Author: Aparna Naik <[email protected]>
AuthorDate: Fri Feb 13 12:03:36 2026 -0800
Forbid nodes upgrading to a version which cannot read existing log entries
Patch by Aparna Naik; reviewed by Marcus Eriksson and Sam Tunnicliffe
for CASSANDRA-21174
---
CHANGES.txt | 1 +
.../cassandra/tcm/transformations/Register.java | 18 +++
.../cassandra/tcm/transformations/Startup.java | 15 +++
...compatibleMetadataSerializationVersionTest.java | 111 +++++++++++++++
.../distributed/test/log/RegisterTest.java | 46 ++++---
test/unit/org/apache/cassandra/Util.java | 12 --
.../InsertUpdateIfConditionCollectionsTest.java | 17 +--
.../InsertUpdateIfConditionStaticsTest.java | 14 +-
.../operations/InsertUpdateIfConditionTest.java | 60 ++++++---
.../apache/cassandra/tcm/ClusterMetadataTest.java | 15 ++-
.../tcm/transformations/RegisterTest.java | 150 +++++++++++++++++++++
.../cassandra/tcm/transformations/StartupTest.java | 112 +++++++++++++++
12 files changed, 490 insertions(+), 81 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 12c457895a..701f7bf02c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Forbid nodes upgrading to a version which cannot read existing log entries
(CASSANDRA-21174)
* Introduce a check for minimum time to pass to train or import a compression
dictionary from the last one (CASSANDRA-21179)
* Allow overriding compaction strategy parameters during startup
(CASSANDRA-21169)
* Introduce created_at column to system_distributed.compression_dictionaries
(CASSANDRA-21178)
diff --git a/src/java/org/apache/cassandra/tcm/transformations/Register.java
b/src/java/org/apache/cassandra/tcm/transformations/Register.java
index 0b3ebb375f..1e31ff3a85 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/Register.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/Register.java
@@ -74,6 +74,24 @@ public class Register implements Transformation
@Override
public Result execute(ClusterMetadata prev)
{
+ // Ensure the joining node can read existing cluster metadata.
+ // Skip check for empty directory (first node in a new cluster).
+ if (!prev.directory.isEmpty())
+ {
+ Version clusterVersion = prev.directory.commonSerializationVersion;
+ Version newNodeVersion = version.serializationVersion();
+ if (newNodeVersion.isBefore(clusterVersion))
+ {
+ return new Rejected(INVALID,
+ String.format("Cannot register node: this
node's metadata serialization version %s " +
+ "is lower than the cluster's
minimum required version %s. " +
+ "Node would not be able to
read cluster metadata. " +
+ "Please upgrade the node to
a Cassandra version that supports " +
+ "metadata serialization
version %s or higher before joining the cluster.",
+ newNodeVersion,
clusterVersion, clusterVersion));
+ }
+ }
+
for (Map.Entry<NodeId, NodeAddresses> entry :
prev.directory.addresses.entrySet())
{
NodeAddresses existingAddresses = entry.getValue();
diff --git a/src/java/org/apache/cassandra/tcm/transformations/Startup.java
b/src/java/org/apache/cassandra/tcm/transformations/Startup.java
index a1abfdbff4..7b8c4cff7e 100644
--- a/src/java/org/apache/cassandra/tcm/transformations/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/transformations/Startup.java
@@ -68,6 +68,21 @@ public class Startup implements Transformation
@Override
public Result execute(ClusterMetadata prev)
{
+ // Prevent downgrade to a version that cannot read cluster metadata.
+ // This protects against restarting a node with an older binary.
+ Version clusterVersion = prev.directory.commonSerializationVersion;
+ Version newNodeVersion = nodeVersion.serializationVersion();
+ if (newNodeVersion.isBefore(clusterVersion))
+ {
+ return new Rejected(INVALID,
+ String.format("Cannot start node: this node's
metadata serialization version %s " +
+ "is lower than the cluster's
minimum required version %s. " +
+ "Node would not be able to read
cluster metadata. " +
+ "Please upgrade the node to a
Cassandra version that supports " +
+ "metadata serialization version
%s or higher before restarting.",
+ newNodeVersion, clusterVersion,
clusterVersion));
+ }
+
ClusterMetadata.Transformer next = prev.transformer();
if (!prev.directory.addresses.get(nodeId).equals(addresses))
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/IncompatibleMetadataSerializationVersionTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/IncompatibleMetadataSerializationVersionTest.java
new file mode 100644
index 0000000000..84e461b6e4
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/IncompatibleMetadataSerializationVersionTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.log;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static
org.apache.cassandra.tcm.membership.NodeVersion.CURRENT_METADATA_VERSION;
+
+public class IncompatibleMetadataSerializationVersionTest extends TestBaseImpl
+{
+ @Test
+ public void incompatibleVersionsCauseStartupFailureTest() throws Throwable
+ {
+ try (Cluster cluster = builder().withNodes(2)
+ .withInstanceInitializer(BB::install)
+ .createWithoutStarting())
+ {
+ cluster.get(1).startup();
+ // node1 has joined as normal so any entries committed to the
metadata log will be serialized with
+ // NodeVersion.CURRENT_METADATA_VERSION. We will join node2, but
the BB class used as an instanceInitializer
+ // will force it not to recognise this version. This simulates a
node running an older, incompatible version
+ // attempting to join the cluster and should fail as the metadata
log and snapshots it receives at startup
+ // are unreadable to it.
+ // We'll also set up the uncaught exceptions filter so that errors
reported by node2 do not automatically
+ // trigger a failure, so that we can assert that the specific
error we're expecting is thrown and logged.
+ cluster.setUncaughtExceptionsFilter((i, t) -> i != 2);
+ try
+ {
+ cluster.get(2).startup();
+ Assert.fail("Node2 startup should fail due to unsupported
metadata versions");
+ }
+ catch (Exception e)
+ {
+ String expectedError = String.format("Unsupported metadata
version \\(%s\\)", CURRENT_METADATA_VERSION.asInt());
+ Assert.assertFalse(cluster.get(2)
+ .logs()
+ .grep(expectedError)
+ .getResult()
+ .isEmpty());
+ }
+ }
+ }
+
+ public static class BB
+ {
+ static void install(ClassLoader cl, int node)
+ {
+ // only change behaviour of node2
+ if (node == 2)
+ {
+ new ByteBuddy().rebase(Version.class)
+ .method(named("fromInt"))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(cl,
ClassLoadingStrategy.Default.INJECTION);
+
+ new ByteBuddy().rebase(NodeVersion.class)
+ .method(named("serializationVersion"))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(cl,
ClassLoadingStrategy.Default.INJECTION);
+ }
+ }
+
+ public static Version serializationVersion()
+ {
+ // This is called during node startup when initializing the
LogState class and in particular its static
+ // defaultMessageSerializer field. We will emulate the behaviour
of a node running an old version.
+ return Version.V0;
+ }
+
+ public static Version fromInt(int i)
+ {
+ // Behave as if the supplied version is invalid, unless it is the
V0 value we are returning from the other
+ // intercepted method. This will cause any other version
encountered, such as when receiving versioned log
+ // entries from another node, to appear unreadable.
+ if (i == Version.V0.asInt())
+ return Version.V0;
+
+ throw new IllegalArgumentException("Unsupported metadata version
(" + i + ")");
+ }
+ }
+
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
index 393e9efd77..81006bbfc0 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/log/RegisterTest.java
@@ -49,7 +49,6 @@ import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
import org.apache.cassandra.tcm.serialization.Version;
import org.apache.cassandra.tcm.transformations.PrepareLeave;
import org.apache.cassandra.tcm.transformations.Register;
-import org.apache.cassandra.tcm.transformations.Startup;
import org.apache.cassandra.tcm.transformations.TriggerSnapshot;
import org.apache.cassandra.tcm.transformations.Unregister;
import org.apache.cassandra.utils.CassandraVersion;
@@ -58,6 +57,8 @@ import static org.junit.Assert.assertEquals;
public class RegisterTest extends TestBaseImpl
{
+ private static final Location TEST_LOCATION = new Location("datacenter1",
"rack1");
+
@Test
public void testRegistrationIdempotence() throws Throwable
{
@@ -103,28 +104,28 @@ public class RegisterTest extends TestBaseImpl
try (Cluster cluster = builder().withNodes(1)
.createWithoutStarting())
{
- final String firstNodeEndpoint = "127.0.0.10";
cluster.get(1).startup();
cluster.get(1).runOnInstance(() -> {
try
{
- // Register a ghost node with V0 to fake-force V0
serialization. In a real world cluster we will always be upgrading from a
smaller version.
- ClusterMetadataService.instance().commit(new Register(new
NodeAddresses(InetAddressAndPort.getByName(firstNodeEndpoint)),
-
ClusterMetadata.current().directory.location(ClusterMetadata.current().myNodeId()),
+ // Unregister to make directory empty
+ ClusterMetadataService.instance().commit(new
Unregister(ClusterMetadata.current().myNodeId(),
+
EnumSet.allOf(NodeState.class),
+
ClusterMetadataService.instance().placementProvider()));
+
+ // Register a ghost node with V0 (bypasses version check
because directory is now empty).
+ // In a real world cluster we will always be upgrading
from a smaller version.
+ ClusterMetadataService.instance().commit(new Register(new
NodeAddresses(InetAddressAndPort.getByName("127.0.0.100")),
+
TEST_LOCATION,
new
NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0)));
- NodeId oldNode =
ClusterMetadata.current().directory.peerId(InetAddressAndPort.getByName(firstNodeEndpoint));
- // Fake an upgrade of this node and assert we continue to
serialize so that the one which only
- // supports V0 can deserialize. In a real cluster it
wouldn't happen exactly in this way (here the
- // min serialization version actually goes backwards from
CURRENT to V0 when we upgrade, which would
- // not happen in a real cluster as we would never register
like oldNode, with the current C* version
- // but an older metadata version
+ NodeId oldNode =
ClusterMetadata.current().directory.peerId(InetAddressAndPort.getByName("127.0.0.100"));
+
+ // Register a node with upgraded version
CassandraVersion currentVersion =
NodeVersion.CURRENT.cassandraVersion;
NodeVersion upgraded = new NodeVersion(new
CassandraVersion(String.format("%d.%d.%d", currentVersion.major + 1, 0, 0)),
NodeVersion.CURRENT_METADATA_VERSION);
- ClusterMetadata metadata = ClusterMetadata.current();
- NodeId id = metadata.myNodeId();
- Startup startup = new Startup(id,
metadata.directory.getNodeAddresses(id), upgraded);
- ClusterMetadataService.instance().commit(startup);
+ ClusterMetadataService.instance().commit(new Register(new
NodeAddresses(InetAddressAndPort.getByName("127.0.0.200")), TEST_LOCATION,
upgraded));
+
// Doesn't matter which specific Transformation we use
here, we're testing that the serializer uses
// the correct lower bound
Transformation t = new Register(NodeAddresses.current(),
new Location("DC", "RACK"), NodeVersion.CURRENT);
@@ -173,9 +174,15 @@ public class RegisterTest extends TestBaseImpl
cluster.get(1).runOnInstance(() -> {
try
{
- // Register a ghost node with V0 to fake-force V0
serialization. In a real world cluster we will always be upgrading from a
smaller version.
- ClusterMetadataService.instance().commit(new Register(new
NodeAddresses(InetAddressAndPort.getByName("127.0.0.10")),
-
ClusterMetadata.current().directory.location(ClusterMetadata.current().myNodeId()),
+ // Unregister to make directory empty
+ ClusterMetadataService.instance().commit(new
Unregister(ClusterMetadata.current().myNodeId(),
+
EnumSet.allOf(NodeState.class),
+
ClusterMetadataService.instance().placementProvider()));
+
+ // Register a ghost node with V0 (bypasses version check
because directory is now empty).
+ // In a real world cluster we will always be upgrading
from a smaller version.
+ ClusterMetadataService.instance().commit(new Register(new
NodeAddresses(InetAddressAndPort.getByName("127.0.0.100")),
+
TEST_LOCATION,
new
NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V0)));
}
catch (UnknownHostException e)
@@ -187,9 +194,6 @@ public class RegisterTest extends TestBaseImpl
ClusterMetadata cm = new
MetadataSnapshots.SystemKeyspaceMetadataSnapshots().getSnapshot(ClusterMetadata.current().epoch);
cm.equals(ClusterMetadata.current());
});
-
-
}
}
-
}
diff --git a/test/unit/org/apache/cassandra/Util.java
b/test/unit/org/apache/cassandra/Util.java
index 9f7a12a854..1bea5d4d4b 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -151,13 +151,8 @@ import org.apache.cassandra.service.snapshot.TableSnapshot;
import org.apache.cassandra.streaming.StreamResultFuture;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.tcm.ClusterMetadata;
-import org.apache.cassandra.tcm.membership.NodeAddresses;
-import org.apache.cassandra.tcm.membership.NodeVersion;
-import org.apache.cassandra.tcm.serialization.Version;
-import org.apache.cassandra.tcm.transformations.Register;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.CounterId;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.FilterFactory;
@@ -1205,13 +1200,6 @@ public class Util
assertEquals(0, ((SSTableReaderWithFilter)
reader).getFilterOffHeapSize());
}
- public static void setUpgradeFromVersion(String version)
- {
- InetAddressAndPort ep =
InetAddressAndPort.getByNameUnchecked("127.0.0.10");
- Register.register(new NodeAddresses(ep),
- new NodeVersion(new CassandraVersion(version),
Version.OLD));
- }
-
/**
* Sets the length of the file to given size. File will be created if not
exist.
*
diff --git
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionCollectionsTest.java
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionCollectionsTest.java
index a2e4be9b91..695d5d7410 100644
---
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionCollectionsTest.java
+++
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionCollectionsTest.java
@@ -22,17 +22,16 @@ import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
-import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.SyntaxException;
+import org.apache.cassandra.tcm.membership.NodeVersion;
/* InsertUpdateIfConditionCollectionsTest class has been split into multiple
ones because of timeout issues (CASSANDRA-16670)
* Any changes here check if they apply to the other classes
@@ -46,18 +45,16 @@ public class InsertUpdateIfConditionCollectionsTest extends
CQLTester
@Parameterized.Parameters(name = "{index}: clusterMinVersion={0}")
public static Collection<Object[]> data()
{
- ServerTestUtils.daemonInitialization();
-
return InsertUpdateIfConditionTest.data();
}
@Parameterized.Parameter(0)
- public String clusterMinVersion;
+ public NodeVersion clusterMinVersion;
@BeforeClass
- public static void beforeClass()
+ public static void setUpClass()
{
- InsertUpdateIfConditionTest.beforeClass();
+ InsertUpdateIfConditionTest.setUpClass();
}
@Before
@@ -66,12 +63,6 @@ public class InsertUpdateIfConditionCollectionsTest extends
CQLTester
InsertUpdateIfConditionTest.beforeSetup(clusterMinVersion);
}
- @AfterClass
- public static void afterClass()
- {
- InsertUpdateIfConditionTest.afterClass();
- }
-
/**
* Migrated from cql_tests.py:TestCQL.bug_6069_test()
*/
diff --git
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionStaticsTest.java
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionStaticsTest.java
index 7b670956a0..25d1387050 100644
---
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionStaticsTest.java
+++
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionStaticsTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.cql3.validation.operations;
import java.util.Collection;
-import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -29,6 +28,7 @@ import org.junit.runners.Parameterized;
import org.apache.cassandra.ServerTestUtils;
import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.tcm.membership.NodeVersion;
/* InsertUpdateIfConditionCollectionsTest class has been split into multiple
ones because of timeout issues (CASSANDRA-16670)
* Any changes here check if they apply to the other classes
@@ -47,12 +47,12 @@ public class InsertUpdateIfConditionStaticsTest extends
CQLTester
}
@Parameterized.Parameter(0)
- public String clusterMinVersion;
+ public NodeVersion clusterMinVersion;
@BeforeClass
- public static void beforeClass()
+ public static void setUpClass()
{
- InsertUpdateIfConditionTest.beforeClass();
+ InsertUpdateIfConditionTest.setUpClass();
}
@Before
@@ -61,12 +61,6 @@ public class InsertUpdateIfConditionStaticsTest extends
CQLTester
InsertUpdateIfConditionTest.beforeSetup(clusterMinVersion);
}
- @AfterClass
- public static void afterClass()
- {
- InsertUpdateIfConditionTest.afterClass();
- }
-
/**
* Migrated from cql_tests.py:TestCQL.static_columns_cas_test()
*/
diff --git
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
index 750141674a..68663d3f85 100644
---
a/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
+++
b/test/unit/org/apache/cassandra/cql3/validation/operations/InsertUpdateIfConditionTest.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.cql3.validation.operations;
import java.util.Arrays;
import java.util.Collection;
-import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -29,12 +28,20 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.cassandra.ServerTestUtils;
-import org.apache.cassandra.Util;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.Duration;
-import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.SchemaKeyspaceTables;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.tcm.transformations.Register;
+import org.apache.cassandra.utils.CassandraVersion;
import static java.lang.String.format;
import static org.junit.Assert.assertEquals;
@@ -53,16 +60,23 @@ public class InsertUpdateIfConditionTest extends CQLTester
{
ServerTestUtils.daemonInitialization();
// TODO [tcm] we will require upgrading from 4.1
- return Arrays.asList(new Object[]{ "4.1" }, new Object[]{ "4.0" });
+ return Arrays.asList(new Object[]{ NodeVersion.CURRENT },
+ new Object[]{ new NodeVersion( new
CassandraVersion("4.1"), Version.OLD) },
+ new Object[]{ new NodeVersion( new
CassandraVersion("4.0"), Version.OLD) });
}
@Parameterized.Parameter(0)
- public String clusterMinVersion;
+ public NodeVersion clusterMinVersion;
@BeforeClass
- public static void beforeClass()
+ public static void setUpClass()
{
- Gossiper.instance.start(0);
+ // This intentionally shadows CQLTester::setUpClass, in order to
initialize the ClusterMetadataService
+ // without automatically registering the first node. This is so the
directory can be setup to mimic a
+ // mid-upgrade cluster with nodes on both old and new versions.
+ prePrepareServer();
+ ServerTestUtils.prepareServerNoRegister();
+ ServerTestUtils.markCMS(); // CQLTester::afterTest will reset the CMS
& ClusterMetadata to this state
}
@Before
@@ -70,21 +84,27 @@ public class InsertUpdateIfConditionTest extends CQLTester
{
beforeSetup(clusterMinVersion);
}
-
- public static void beforeSetup(String clusterMinVersion)
- {
- // setUpgradeFromVersion adds node2 to the Gossiper. On slow CI envs
the Gossiper might auto-remove it after some
- // timeout if it thinks it's a fat client making the test fail. Just
retry C18393.
- Util.spinAssertEquals(Boolean.TRUE, () -> {
- Util.setUpgradeFromVersion(clusterMinVersion);
- return true;
- }, 5);
- }
- @AfterClass
- public static void afterClass()
+ public static void beforeSetup(NodeVersion clusterMinVersion)
{
- Gossiper.instance.stop();
+ // Add two entries to ClusterMetadata, to make it potentially appear
as a mixed-version cluster (if the
+ // supplied version is lower than current).
+ ClusterMetadataService.instance()
+ .commit(new Register(new
NodeAddresses(InetAddressAndPort.getByNameUnchecked("127.0.0.10")),
+ new Location("dc1",
"rack1"),
+ clusterMinVersion));
+ ClusterMetadataService.instance()
+ .commit(new Register(new
NodeAddresses(InetAddressAndPort.getByNameUnchecked("127.0.0.20")),
+ new Location("dc1",
"rack1"),
+ NodeVersion.CURRENT));
+
+ Directory directory = ClusterMetadata.current().directory;
+ assertEquals(directory.clusterMinVersion, clusterMinVersion);
+ assertEquals(NodeVersion.CURRENT, directory.clusterMaxVersion);
+ // Version.OLD does not influence commonSerializationVersion as
un-upgraded nodes are completely outside
+ // the scope of maintaining backward compatible metadata
serializations (i.e. they predate them entirely). So
+ // in this test, the common serialization version will always be the
current one.
+ assertEquals(NodeVersion.CURRENT_METADATA_VERSION,
ClusterMetadata.current().directory.commonSerializationVersion);
}
/**
diff --git a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java
b/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java
index ac3a278cf2..56af302c23 100644
--- a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java
+++ b/test/unit/org/apache/cassandra/tcm/ClusterMetadataTest.java
@@ -34,17 +34,17 @@ import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Keyspaces;
import org.apache.cassandra.schema.SchemaTransformation;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
import org.apache.cassandra.tcm.membership.NodeId;
import org.apache.cassandra.tcm.membership.NodeVersion;
import org.apache.cassandra.tcm.ownership.DataPlacement;
-import org.apache.cassandra.tcm.ownership.UniformRangePlacement;
import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
import org.apache.cassandra.tcm.sequences.LockedRanges;
import org.apache.cassandra.tcm.sequences.UnbootstrapAndLeave;
import org.apache.cassandra.tcm.serialization.Version;
import org.apache.cassandra.tcm.transformations.AlterSchema;
-import org.apache.cassandra.tcm.transformations.Assassinate;
import org.apache.cassandra.tcm.transformations.CustomTransformation;
+import org.apache.cassandra.tcm.transformations.Startup;
import org.apache.cassandra.utils.CassandraVersion;
import static
org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.addr;
@@ -137,11 +137,15 @@ public class ClusterMetadataTest
private static void newTransformationHelper(Transformation transformation)
{
NodeId v4Node = null;
+ NodeAddresses v4Addresses = null;
for (int i = 1; i <= 4; i++)
{
- NodeId nodeId = ClusterMetadataTestHelper.register(addr(i), "dc0",
"rack0", new NodeVersion(CassandraVersion.CASSANDRA_5_0, i == 4 ? Version.V4 :
Version.V5));
- if (i == 4)
+ NodeId nodeId = ClusterMetadataTestHelper.register(addr(i), "dc0",
"rack0", new NodeVersion(CassandraVersion.CASSANDRA_5_0, i == 1 ? Version.V4 :
Version.V5));
+ if (i == 1)
+ {
v4Node = nodeId;
+ v4Addresses = new NodeAddresses(addr(i));
+ }
ClusterMetadataTestHelper.join(i, i);
}
@@ -154,7 +158,8 @@ public class ClusterMetadataTest
{
assertTrue(e.getMessage().contains("Transformation rejected"));
}
- ClusterMetadataService.instance().commit(new Assassinate(v4Node, new
UniformRangePlacement()));
+ // "upgrade" v4Node and the transformation should become committable
+ ClusterMetadataService.instance().commit(new Startup(v4Node,
v4Addresses, new NodeVersion(CassandraVersion.CASSANDRA_5_0, Version.V5)));
ClusterMetadataService.instance().commit(transformation);
}
diff --git
a/test/unit/org/apache/cassandra/tcm/transformations/RegisterTest.java
b/test/unit/org/apache/cassandra/tcm/transformations/RegisterTest.java
new file mode 100644
index 0000000000..3b385b01b4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tcm/transformations/RegisterTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.transformations;
+
+import java.net.UnknownHostException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.exceptions.ExceptionCode;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeState;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RegisterTest
+{
+ private static final Location LOCATION = new Location("dc", "rack");
+
+ /**
+ * Tests that registering a new node with a serialization version lower
than the cluster's
+ * commonSerializationVersion is rejected.
+ */
+ @Test
+ public void rejectsLowerSerializationVersion() throws UnknownHostException
+ {
+ NodeId existingNode = new NodeId(1);
+
+ Directory directory = Directory.EMPTY
+ .unsafeWithNodeForTesting(existingNode,
+ new
NodeAddresses(InetAddressAndPort.getByName("127.0.0.1")),
+ LOCATION,
+ NodeVersion.CURRENT)
+ .withNodeState(existingNode, NodeState.JOINED);
+
+ ClusterMetadata metadata =
ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance)
+ .transformer()
+ .with(directory)
+ .build().metadata;
+
+ assertEquals("commonSerializationVersion should be
CURRENT_METADATA_VERSION", NodeVersion.CURRENT_METADATA_VERSION,
metadata.directory.commonSerializationVersion);
+
+ // Try to register a new node with V3 (lower than cluster's current
version)
+ NodeVersion lowerVersion = new
NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3);
+ Register register = new Register(
+ new NodeAddresses(InetAddressAndPort.getByName("127.0.0.2")),
+ LOCATION,
+ lowerVersion
+ );
+
+ Transformation.Result result = register.execute(metadata);
+
+ assertTrue("Registration should be rejected for node with lower
serialization version", result.isRejected());
+ assertEquals(ExceptionCode.INVALID, result.rejected().code);
+ }
+
+ /**
+ * Tests that registering nodes with serialization version equal to or
higher than
+ * the cluster's commonSerializationVersion is allowed.
+ */
+ @Test
+ public void allowsEqualOrHigherSerializationVersion() throws
UnknownHostException
+ {
+ NodeId existingNode = new NodeId(1);
+ NodeVersion v3 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion,
Version.V3);
+
+ Directory directory = Directory.EMPTY
+ .unsafeWithNodeForTesting(existingNode,
+ new
NodeAddresses(InetAddressAndPort.getByName("127.0.0.1")),
+ LOCATION,
+ v3)
+ .withNodeState(existingNode, NodeState.JOINED);
+
+ ClusterMetadata metadata =
ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance)
+ .transformer()
+ .with(directory)
+ .build().metadata;
+
+ assertEquals("commonSerializationVersion should be V3", Version.V3,
metadata.directory.commonSerializationVersion);
+
+ // Register a node with higher version - should succeed
+ Register registerHigher = new Register(
+ new NodeAddresses(InetAddressAndPort.getByName("127.0.0.2")),
+ LOCATION,
+ NodeVersion.CURRENT
+ );
+
+ Transformation.Result resultHigher = registerHigher.execute(metadata);
+ assertTrue("Registration should succeed for node with higher
serialization version", resultHigher.isSuccess());
+
+ // Register a node with equal version - should succeed
+ Register registerEqual = new Register(
+ new NodeAddresses(InetAddressAndPort.getByName("127.0.0.3")),
+ LOCATION,
+ v3
+ );
+
+ Transformation.Result resultEqual = registerEqual.execute(metadata);
+ assertTrue("Registration should succeed for node with equal
serialization version", resultEqual.isSuccess());
+ }
+
+ /**
+ * Tests that the first node in an empty cluster can register with any
version
+ * (bypasses version check because directory is empty).
+ */
+ @Test
+ public void allowsAnyVersionForFirstNode() throws UnknownHostException
+ {
+ ClusterMetadata metadata =
ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance);
+
+ assertTrue("Directory should be empty", metadata.directory.isEmpty());
+
+ // Register first node with V0 - should succeed because directory is
empty
+ NodeVersion v0 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion,
Version.V0);
+ Register register = new Register(
+ new NodeAddresses(InetAddressAndPort.getByName("127.0.0.1")),
+ LOCATION,
+ v0
+ );
+
+ Transformation.Result result = register.execute(metadata);
+ assertTrue("First node registration should succeed with any version",
result.isSuccess());
+ }
+}
\ No newline at end of file
diff --git
a/test/unit/org/apache/cassandra/tcm/transformations/StartupTest.java
b/test/unit/org/apache/cassandra/tcm/transformations/StartupTest.java
new file mode 100644
index 0000000000..8e0c165a22
--- /dev/null
+++ b/test/unit/org/apache/cassandra/tcm/transformations/StartupTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.transformations;
+
+import java.net.UnknownHostException;
+
+import org.junit.Test;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
+import org.apache.cassandra.exceptions.ExceptionCode;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.Directory;
+import org.apache.cassandra.tcm.membership.Location;
+import org.apache.cassandra.tcm.membership.NodeAddresses;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.membership.NodeState;
+import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class StartupTest
+{
+ private static final Location LOCATION = new Location("dc", "rack");
+
+ /**
+ * Tests that the Startup transformation rejects downgrading a node to a
version
+ * that cannot read cluster metadata.
+ */
+ @Test
+ public void rejectsDowngrade() throws UnknownHostException
+ {
+ NodeId nodeId = new NodeId(1);
+ NodeAddresses addresses = new
NodeAddresses(InetAddressAndPort.getByName("127.0.0.1"));
+
+ Directory directory = Directory.EMPTY
+ .unsafeWithNodeForTesting(nodeId, addresses,
LOCATION, NodeVersion.CURRENT)
+ .withNodeState(nodeId, NodeState.JOINED);
+
+ ClusterMetadata metadata =
ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance)
+ .transformer()
+ .with(directory)
+ .build().metadata;
+
+ assertEquals("commonSerializationVersion should be
CURRENT_METADATA_VERSION",
+ NodeVersion.CURRENT_METADATA_VERSION,
metadata.directory.commonSerializationVersion);
+
+ // Try to "downgrade" the node to V3 (simulating restart with older
binary)
+ NodeVersion downgradedVersion = new
NodeVersion(NodeVersion.CURRENT.cassandraVersion, Version.V3);
+ Startup startup = new Startup(nodeId, addresses, downgradedVersion);
+
+ Transformation.Result result = startup.execute(metadata);
+
+ assertTrue("Startup should be rejected for downgrade to lower
serialization version", result.isRejected());
+ assertEquals(ExceptionCode.INVALID, result.rejected().code);
+ }
+
+ /**
+ * Tests that the Startup transformation allows a node to restart with
equal or higher
+ * serialization version.
+ */
+ @Test
+ public void allowsEqualOrHigherSerializationVersion() throws
UnknownHostException
+ {
+ NodeId nodeId = new NodeId(1);
+ NodeAddresses addresses = new
NodeAddresses(InetAddressAndPort.getByName("127.0.0.1"));
+ NodeVersion v3 = new NodeVersion(NodeVersion.CURRENT.cassandraVersion,
Version.V3);
+
+ Directory directory = Directory.EMPTY
+ .unsafeWithNodeForTesting(nodeId, addresses,
LOCATION, v3)
+ .withNodeState(nodeId, NodeState.JOINED);
+
+ ClusterMetadata metadata =
ClusterMetadataTestHelper.minimalForTesting(Murmur3Partitioner.instance)
+ .transformer()
+ .with(directory)
+ .build().metadata;
+
+ assertEquals("commonSerializationVersion should be V3", Version.V3,
metadata.directory.commonSerializationVersion);
+
+ // Startup with higher version - should succeed
+ Startup startupHigher = new Startup(nodeId, addresses,
NodeVersion.CURRENT);
+
+ Transformation.Result resultHigher = startupHigher.execute(metadata);
+ assertTrue("Startup should succeed for higher serialization version",
resultHigher.isSuccess());
+
+ // Startup with equal version - should succeed
+ Startup startupEqual = new Startup(nodeId, addresses, v3);
+
+ Transformation.Result resultEqual = startupEqual.execute(metadata);
+ assertTrue("Startup should succeed for equal serialization version",
resultEqual.isSuccess());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]