This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cassandra-2.2 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-2.2 by this push: new 7d2c3c2 Support cross version messaging in in-jvm upgrade dtests 7d2c3c2 is described below commit 7d2c3c215f65ee41f86886304257647fc24b1f70 Author: Blake Eggleston <bdeggles...@gmail.com> AuthorDate: Thu Apr 4 14:39:57 2019 -0700 Support cross version messaging in in-jvm upgrade dtests Patch by Blake Eggleston; Reviewed by Alex Petrov for CASSANDRA-15078 --- CHANGES.txt | 1 + .../cassandra/distributed/api/IInstance.java | 3 +++ .../distributed/impl/AbstractCluster.java | 18 ++++++++++++++++- .../impl/DelegatingInvokableInstance.java | 12 +++++++++++ .../cassandra/distributed/impl/Instance.java | 23 ++++++++++++++++------ .../distributed/impl/InstanceClassLoader.java | 11 +++++++++++ .../distributed/upgrade/UpgradeTestBase.java | 11 ++++------- 7 files changed, 65 insertions(+), 14 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e887733..1cc4153 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.15 + * Support cross version messaging in in-jvm upgrade dtests (CASSANDRA-15078) * Fix index summary redistribution cancellation (CASSANDRA-15045) * Refactor Circle CI configuration (CASSANDRA-14806) * Fixing invalid CQL in security documentation (CASSANDRA-15020) diff --git a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java index 8c9f962..3834093 100644 --- a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java +++ b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java @@ -42,4 +42,7 @@ public interface IInstance extends IIsolatedExecutor // these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface void startup(ICluster cluster); void receiveMessage(IMessage message); + + int getMessagingVersion(); + void setMessagingVersion(InetAddressAndPort endpoint, int version); } diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java index 2e759f5..1dc7a65 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java @@ -141,8 +141,9 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, { if (!isShutdown) throw new IllegalStateException(); - delegate().startup(AbstractCluster.this); + delegate.startup(AbstractCluster.this); isShutdown = false; + updateMessagingVersions(); } @Override @@ -252,6 +253,21 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster, }).run(); } + private void updateMessagingVersions() + { + for (IInstance reportTo: instances) + { + for (IInstance reportFrom: instances) + { + if (reportFrom == reportTo) + continue; + + int minVersion = Math.min(reportFrom.getMessagingVersion(), reportTo.getMessagingVersion()); + reportTo.setMessagingVersion(reportFrom.broadcastAddressAndPort(), minVersion); + } + } + } + /** * Will wait for a schema change AND agreement that occurs after it is created * (and precedes the invocation to waitForAgreement) diff --git a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java index 27e2c04..e9e6844 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java @@ -74,6 +74,18 @@ public abstract class DelegatingInvokableInstance implements IInvokableInstance } @Override + public int getMessagingVersion() + { + return delegate().getMessagingVersion(); + } + + @Override + public void setMessagingVersion(InetAddressAndPort endpoint, int version) + { + delegate().setMessagingVersion(endpoint, version); + } + + @Override public IInstanceConfig config() { return delegate().config(); diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index e37d60f..dce03ca 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.function.BiConsumer; @@ -57,18 +56,17 @@ import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.distributed.api.ICoordinator; import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.api.IListen; import org.apache.cassandra.distributed.api.IMessage; -import org.apache.cassandra.distributed.api.ICluster; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.metrics.CassandraMetricsRegistry; import org.apache.cassandra.net.IMessageSink; import org.apache.cassandra.net.MessageDeliveryTask; import org.apache.cassandra.net.MessageIn; @@ -201,8 +199,9 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance InetAddressAndPort from = broadcastAddressAndPort(); assert from.equals(lookupAddressAndPort.apply(messageOut.from)); InetAddressAndPort toFull = lookupAddressAndPort.apply(to); - messageOut.serialize(out, MessagingService.current_version); - deliver.accept(toFull, new Message(messageOut.verb.ordinal(), out.toByteArray(), id, MessagingService.current_version, from)); + int version = MessagingService.instance().getVersion(to); + messageOut.serialize(out, version); + deliver.accept(toFull, new Message(messageOut.verb.ordinal(), out.toByteArray(), id, version, from)); } catch (IOException e) { @@ -234,6 +233,16 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance }).run(); } + public int getMessagingVersion() + { + return callsOnInstance(() -> MessagingService.current_version).call(); + } + + public void setMessagingVersion(InetAddressAndPort endpoint, int version) + { + runOnInstance(() -> MessagingService.instance().setVersion(endpoint.address, version)); + } + @Override public void startup(ICluster cluster) { @@ -340,7 +349,9 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance ApplicationState.STATUS, new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(tokens.get(i)))); Gossiper.instance.realMarkAlive(ep.address, Gossiper.instance.getEndpointStateForEndpoint(ep.address)); - MessagingService.instance().setVersion(ep.address, MessagingService.current_version); + + int version = Math.min(MessagingService.current_version, cluster.get(ep).getMessagingVersion()); + MessagingService.instance().setVersion(ep.address, version); } // check that all nodes are in token metadata diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java index 1722515..6fd5c7e 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java @@ -63,11 +63,15 @@ public class InstanceClassLoader extends URLClassLoader InstanceClassLoader create(int id, URL[] urls, ClassLoader sharedClassLoader); } + private final int id; + private final URL[] urls; private final ClassLoader sharedClassLoader; InstanceClassLoader(int id, URL[] urls, ClassLoader sharedClassLoader) { super(urls, null); + this.id = id; + this.urls = urls; this.sharedClassLoader = sharedClassLoader; } @@ -102,4 +106,11 @@ public class InstanceClassLoader extends URLClassLoader return clazz.getClassLoader().getClass().getName().equals(InstanceClassLoader.class.getName()); } + public String toString() + { + return "InstanceClassLoader{" + + "id=" + id + + ", urls=" + Arrays.toString(urls) + + '}'; + } } diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java index 812bdbe..4403767 100644 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java @@ -28,7 +28,8 @@ import org.apache.cassandra.distributed.impl.Versions; import org.apache.cassandra.distributed.impl.Versions.Version; import org.apache.cassandra.distributed.test.DistributedTestBase; -import static org.apache.cassandra.distributed.impl.Versions.*; +import static org.apache.cassandra.distributed.impl.Versions.Major; +import static org.apache.cassandra.distributed.impl.Versions.find; public class UpgradeTestBase extends DistributedTestBase { @@ -129,17 +130,13 @@ public class UpgradeTestBase extends DistributedTestBase { try (UpgradeableCluster cluster = init(UpgradeableCluster.create(nodeCount, upgrade.initial))) { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); - - cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); - cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 2, 2)"); - cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 3, 3)"); + setup.run(cluster); for (Version version : upgrade.upgrade) { for (int n = 1 ; n <= nodeCount ; ++n) { - cluster.get(n).shutdown(); + cluster.get(n).shutdown().get(); cluster.get(n).setVersion(version); cluster.get(n).startup(); runAfterNodeUpgrade.run(cluster, n); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org