This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cassandra-2.2
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-2.2 by this push:
     new 7d2c3c2  Support cross version messaging in in-jvm upgrade dtests
7d2c3c2 is described below

commit 7d2c3c215f65ee41f86886304257647fc24b1f70
Author: Blake Eggleston <bdeggles...@gmail.com>
AuthorDate: Thu Apr 4 14:39:57 2019 -0700

    Support cross version messaging in in-jvm upgrade dtests
    
    Patch by Blake Eggleston; Reviewed by Alex Petrov for CASSANDRA-15078
---
 CHANGES.txt                                        |  1 +
 .../cassandra/distributed/api/IInstance.java       |  3 +++
 .../distributed/impl/AbstractCluster.java          | 18 ++++++++++++++++-
 .../impl/DelegatingInvokableInstance.java          | 12 +++++++++++
 .../cassandra/distributed/impl/Instance.java       | 23 ++++++++++++++++------
 .../distributed/impl/InstanceClassLoader.java      | 11 +++++++++++
 .../distributed/upgrade/UpgradeTestBase.java       | 11 ++++-------
 7 files changed, 65 insertions(+), 14 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index e887733..1cc4153 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.15
+ * Support cross version messaging in in-jvm upgrade dtests (CASSANDRA-15078)
  * Fix index summary redistribution cancellation (CASSANDRA-15045)
  * Refactor Circle CI configuration (CASSANDRA-14806)
  * Fixing invalid CQL in security documentation (CASSANDRA-15020)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java 
b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
index 8c9f962..3834093 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/IInstance.java
@@ -42,4 +42,7 @@ public interface IInstance extends IIsolatedExecutor
     // these methods are not for external use, but for simplicity we leave 
them public and on the normal IInstance interface
     void startup(ICluster cluster);
     void receiveMessage(IMessage message);
+
+    int getMessagingVersion();
+    void setMessagingVersion(InetAddressAndPort endpoint, int version);
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 2e759f5..1dc7a65 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -141,8 +141,9 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster,
         {
             if (!isShutdown)
                 throw new IllegalStateException();
-            delegate().startup(AbstractCluster.this);
+            delegate.startup(AbstractCluster.this);
             isShutdown = false;
+            updateMessagingVersions();
         }
 
         @Override
@@ -252,6 +253,21 @@ public abstract class AbstractCluster<I extends IInstance> 
implements ICluster,
         }).run();
     }
 
+    private void updateMessagingVersions()
+    {
+        for (IInstance reportTo: instances)
+        {
+            for (IInstance reportFrom: instances)
+            {
+                if (reportFrom == reportTo)
+                    continue;
+
+                int minVersion = Math.min(reportFrom.getMessagingVersion(), 
reportTo.getMessagingVersion());
+                
reportTo.setMessagingVersion(reportFrom.broadcastAddressAndPort(), minVersion);
+            }
+        }
+    }
+
     /**
      * Will wait for a schema change AND agreement that occurs after it is 
created
      * (and precedes the invocation to waitForAgreement)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
 
b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
index 27e2c04..e9e6844 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/DelegatingInvokableInstance.java
@@ -74,6 +74,18 @@ public abstract class DelegatingInvokableInstance implements 
IInvokableInstance
     }
 
     @Override
+    public int getMessagingVersion()
+    {
+        return delegate().getMessagingVersion();
+    }
+
+    @Override
+    public void setMessagingVersion(InetAddressAndPort endpoint, int version)
+    {
+        delegate().setMessagingVersion(endpoint, version);
+    }
+
+    @Override
     public IInstanceConfig config()
     {
         return delegate().config();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index e37d60f..dce03ca 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -30,7 +30,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.function.BiConsumer;
@@ -57,18 +56,17 @@ import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IListen;
 import org.apache.cassandra.distributed.api.IMessage;
-import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.metrics.CassandraMetricsRegistry;
 import org.apache.cassandra.net.IMessageSink;
 import org.apache.cassandra.net.MessageDeliveryTask;
 import org.apache.cassandra.net.MessageIn;
@@ -201,8 +199,9 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                 InetAddressAndPort from = broadcastAddressAndPort();
                 assert 
from.equals(lookupAddressAndPort.apply(messageOut.from));
                 InetAddressAndPort toFull = lookupAddressAndPort.apply(to);
-                messageOut.serialize(out, MessagingService.current_version);
-                deliver.accept(toFull, new Message(messageOut.verb.ordinal(), 
out.toByteArray(), id, MessagingService.current_version, from));
+                int version = MessagingService.instance().getVersion(to);
+                messageOut.serialize(out, version);
+                deliver.accept(toFull, new Message(messageOut.verb.ordinal(), 
out.toByteArray(), id, version, from));
             }
             catch (IOException e)
             {
@@ -234,6 +233,16 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
         }).run();
     }
 
+    public int getMessagingVersion()
+    {
+        return callsOnInstance(() -> MessagingService.current_version).call();
+    }
+
+    public void setMessagingVersion(InetAddressAndPort endpoint, int version)
+    {
+        runOnInstance(() -> 
MessagingService.instance().setVersion(endpoint.address, version));
+    }
+
     @Override
     public void startup(ICluster cluster)
     {
@@ -340,7 +349,9 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                         ApplicationState.STATUS,
                         new 
VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(tokens.get(i))));
                 Gossiper.instance.realMarkAlive(ep.address, 
Gossiper.instance.getEndpointStateForEndpoint(ep.address));
-                MessagingService.instance().setVersion(ep.address, 
MessagingService.current_version);
+
+                int version = Math.min(MessagingService.current_version, 
cluster.get(ep).getMessagingVersion());
+                MessagingService.instance().setVersion(ep.address, version);
             }
 
             // check that all nodes are in token metadata
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
 
b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
index 1722515..6fd5c7e 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/InstanceClassLoader.java
@@ -63,11 +63,15 @@ public class InstanceClassLoader extends URLClassLoader
         InstanceClassLoader create(int id, URL[] urls, ClassLoader 
sharedClassLoader);
     }
 
+    private final int id;
+    private final URL[] urls;
     private final ClassLoader sharedClassLoader;
 
     InstanceClassLoader(int id, URL[] urls, ClassLoader sharedClassLoader)
     {
         super(urls, null);
+        this.id = id;
+        this.urls = urls;
         this.sharedClassLoader = sharedClassLoader;
     }
 
@@ -102,4 +106,11 @@ public class InstanceClassLoader extends URLClassLoader
         return 
clazz.getClassLoader().getClass().getName().equals(InstanceClassLoader.class.getName());
     }
 
+    public String toString()
+    {
+        return "InstanceClassLoader{" +
+               "id=" + id +
+               ", urls=" + Arrays.toString(urls) +
+               '}';
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
index 812bdbe..4403767 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTestBase.java
@@ -28,7 +28,8 @@ import org.apache.cassandra.distributed.impl.Versions;
 import org.apache.cassandra.distributed.impl.Versions.Version;
 import org.apache.cassandra.distributed.test.DistributedTestBase;
 
-import static org.apache.cassandra.distributed.impl.Versions.*;
+import static org.apache.cassandra.distributed.impl.Versions.Major;
+import static org.apache.cassandra.distributed.impl.Versions.find;
 
 public class UpgradeTestBase extends DistributedTestBase
 {
@@ -129,17 +130,13 @@ public class UpgradeTestBase extends DistributedTestBase
             {
                 try (UpgradeableCluster cluster = 
init(UpgradeableCluster.create(nodeCount, upgrade.initial)))
                 {
-                    cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl 
(pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-
-                    cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v) VALUES (1, 1, 1)");
-                    cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v) VALUES (1, 2, 2)");
-                    cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v) VALUES (1, 3, 3)");
+                    setup.run(cluster);
 
                     for (Version version : upgrade.upgrade)
                     {
                         for (int n = 1 ; n <= nodeCount ; ++n)
                         {
-                            cluster.get(n).shutdown();
+                            cluster.get(n).shutdown().get();
                             cluster.get(n).setVersion(version);
                             cluster.get(n).startup();
                             runAfterNodeUpgrade.run(cluster, n);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to