This is an automated email from the ASF dual-hosted git repository.

jonmeredith pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 251ae80bb0f27ebca0b88c43d944b55dd7c34201
Merge: 5e2985f10e 6ffa43f68b
Author: Jon Meredith <jonmered...@apache.org>
AuthorDate: Mon Aug 28 15:29:30 2023 -0600

    Merge branch 'cassandra-4.0' into cassandra-4.1

 .../distributed/impl/AbstractCluster.java          | 31 +++++++-
 .../distributed/impl/INodeProvisionStrategy.java   | 89 ++++++++++++++++++----
 .../cassandra/distributed/impl/InstanceConfig.java |  2 +-
 .../distributed/test/jmx/JMXFeatureTest.java       |  3 +-
 .../unit/org/apache/cassandra/net/SocketUtils.java | 57 ++++++++------
 5 files changed, 139 insertions(+), 43 deletions(-)

diff --cc 
test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 47371772b9,f57f2b76e2..c709cde82c
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@@ -183,7 -164,7 +184,8 @@@ public abstract class AbstractCluster<
          extends org.apache.cassandra.distributed.shared.AbstractBuilder<I, C, 
B>
      {
          private INodeProvisionStrategy.Strategy nodeProvisionStrategy = 
INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces;
 +        private ShutdownExecutor shutdownExecutor = DEFAULT_SHUTDOWN_EXECUTOR;
+         private boolean dynamicPortAllocation = false;
  
          {
              // Indicate that we are running in the in-jvm dtest environment
@@@ -196,51 -177,35 +198,73 @@@
          public AbstractBuilder(Factory<I, C, B> factory)
          {
              super(factory);
 +            withSharedClasses(SHARED_PREDICATE);
          }
  
+         @SuppressWarnings("unchecked")
+         private B self()
+         {
+             return (B) this;
+         }
+ 
          public B withNodeProvisionStrategy(INodeProvisionStrategy.Strategy 
nodeProvisionStrategy)
          {
              this.nodeProvisionStrategy = nodeProvisionStrategy;
-             return (B) this;
+             return self();
          }
  
 +        public B withShutdownExecutor(ShutdownExecutor shutdownExecutor)
 +        {
 +            this.shutdownExecutor = shutdownExecutor;
-             return (B) this;
++            return self();
++        }
++
+         /**
+          * When {@code dynamicPortAllocation} is {@code true}, it will ask 
{@link INodeProvisionStrategy} to provision
+          * available storage, native and JMX ports in the given interface. 
When {@code dynamicPortAllocation} is
+          * {@code false} (the default behavior), it will use statically 
allocated ports based on the number of
+          * interfaces available and the node number.
+          *
+          * @param dynamicPortAllocation {@code true} for dynamic port 
allocation, {@code false} for static port
+          *                              allocation
+          * @return a reference to this Builder
+          */
+         public B withDynamicPortAllocation(boolean dynamicPortAllocation)
+         {
+             this.dynamicPortAllocation = dynamicPortAllocation;
+             return self();
          }
 +
 +        @Override
 +        public C createWithoutStarting() throws IOException
 +        {
 +            // if running as vnode but test sets withoutVNodes(), then skip 
the test
 +            // AbstractCluster.createInstanceConfig has similar logic, but 
handles the cases where the test
 +            // attempts to control tokens via config
 +            // when token supplier is defined, use getTokenCount() to see if 
vnodes is supported or not
 +            if (isVnode())
 +            {
 +                Assume.assumeTrue("vnode is not supported", isVNodeAllowed());
 +                // if token count > 1 and isVnode, then good
 +                Assume.assumeTrue("no-vnode is requested but not supported", 
getTokenCount() > 1);
 +            }
 +            else
 +            {
 +                Assume.assumeTrue("single-token is not supported", 
isSingleTokenAllowed());
 +                // if token count == 1 and isVnode == false, then 
goodAbstractClusterTest
 +                Assume.assumeTrue("vnode is requested but not supported", 
getTokenCount() == 1);
 +            }
 +
 +            return super.createWithoutStarting();
 +        }
 +
 +        private boolean isVnode()
 +        {
 +            TokenSupplier ts = getTokenSupplier();
 +            return ts == null
 +                   ? getTokenCount() > 1 // token supplier wasn't defined 
yet, so rely on getTokenCount()
 +                   : ts.tokens(1).size() > 1; // token supplier is defined... 
check the first instance to see what tokens are used
 +        }
      }
  
      protected class Wrapper extends DelegatingInvokableInstance implements 
IUpgradeableInstance
@@@ -536,9 -430,11 +560,10 @@@
          this.instanceMap = new ConcurrentHashMap<>();
          this.initialVersion = builder.getVersion();
          this.filters = new MessageFilters();
 -        this.instanceInitializer = builder.getInstanceInitializer();
 +        this.instanceInitializer = builder.getInstanceInitializer2();
          this.datadirCount = builder.getDatadirCount();
+         this.portMap = builder.dynamicPortAllocation ? new 
ConcurrentHashMap<>() : null;
  
 -        int generation = GENERATION.incrementAndGet();
          for (int i = 0; i < builder.getNodeCount(); ++i)
          {
              int nodeNum = i + 1;
@@@ -558,46 -454,15 +583,46 @@@
          return createInstanceConfig(size() + 1);
      }
  
 -    private InstanceConfig createInstanceConfig(int nodeNum)
 +    @VisibleForTesting
 +    InstanceConfig createInstanceConfig(int nodeNum)
      {
-         INodeProvisionStrategy provisionStrategy = 
nodeProvisionStrategy.create(subnet);
+         INodeProvisionStrategy provisionStrategy = 
nodeProvisionStrategy.create(subnet, portMap);
 -        long token = tokenSupplier.token(nodeNum);
 +        Collection<String> tokens = tokenSupplier.tokens(nodeNum);
          NetworkTopology topology = buildNetworkTopology(provisionStrategy, 
nodeIdTopology);
 -        InstanceConfig config = InstanceConfig.generate(nodeNum, 
provisionStrategy, topology, root, Long.toString(token), datadirCount);
 +        InstanceConfig config = InstanceConfig.generate(nodeNum, 
provisionStrategy, topology, root, tokens, datadirCount);
          config.set(Constants.KEY_DTEST_API_CLUSTER_ID, clusterId.toString());
 +        // if a test sets num_tokens directly, then respect it and only run 
if vnode or no-vnode is defined
 +        int defaultTokenCount = config.getInt("num_tokens");
 +        assert tokens.size() == defaultTokenCount : 
String.format("num_tokens=%d but tokens are %s; size does not match", 
defaultTokenCount, tokens);
 +        String defaultTokens = config.getString("initial_token");
          if (configUpdater != null)
 +        {
              configUpdater.accept(config);
 +            int testTokenCount = config.getInt("num_tokens");
 +            if (defaultTokenCount != testTokenCount)
 +            {
 +                if (testTokenCount == 1)
 +                {
 +                    // test is no-vnode, but running with vnode, so skip
 +                    Assume.assumeTrue("vnode is not supported", false);
 +                }
 +                else
 +                {
 +                    Assume.assumeTrue("no-vnode is requested but not 
supported", defaultTokenCount > 1);
 +                    // if the test controls initial_token or GOSSIP is 
enabled, then the test is safe to run
 +                    if 
(defaultTokens.equals(config.getString("initial_token")))
 +                    {
 +                        // test didn't define initial_token
 +                        Assume.assumeTrue("vnode is enabled and num_tokens is 
defined in test without GOSSIP or setting initial_token", 
config.has(Feature.GOSSIP));
 +                        config.remove("initial_token");
 +                    }
 +                    else
 +                    {
 +                        // test defined initial_token; trust it
 +                    }
 +                }
 +            }
 +        }
          return config;
      }
  
diff --cc 
test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
index 99ef272de0,017b11de26..f2c088c0e5
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java
@@@ -18,14 -18,14 +18,18 @@@
  
  package org.apache.cassandra.distributed.impl;
  
+ import java.util.Map;
+ import javax.annotation.Nullable;
+ 
+ import org.apache.cassandra.net.SocketUtils;
 +import org.apache.cassandra.utils.Shared;
  
 +import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
 +
 +@Shared(inner = INTERFACES)
  public interface INodeProvisionStrategy
  {
-     public enum Strategy
+     enum Strategy
      {
          OneNetworkInterface
          {
diff --cc test/unit/org/apache/cassandra/net/SocketUtils.java
index 78a49bddbf,6a6cd559e2..7d058d8400
--- a/test/unit/org/apache/cassandra/net/SocketUtils.java
+++ b/test/unit/org/apache/cassandra/net/SocketUtils.java
@@@ -19,39 -19,54 +19,52 @@@
  package org.apache.cassandra.net;
  
  import java.io.IOException;
+ import java.net.InetAddress;
  import java.net.ServerSocket;
- 
- import com.google.common.base.Throwables;
+ import java.net.UnknownHostException;
  
 -import com.google.common.base.Throwables;
 -
  public class SocketUtils
  {
-     public static synchronized int findAvailablePort() throws RuntimeException
+     /**
+      * Returns an available port for the given {@code bindAddress}. When an 
{@link IOException} occurs when opening a
+      * socket or if a {@link SecurityException} is raised because a manager 
exists and its checkListen method does
+      * not allow the operation, the {@code fallbackPort} is returned.
+      *
+      * @param bindAddress  the ip address for the interface where we need an 
available port number
+      * @param fallbackPort a port to return in case {@link SecurityException} 
or {@link IOException} is encountered
+      * @return an available port the given {@code bindAddress} when succeeds, 
otherwise the {@code fallbackPort}
+      * @throws RuntimeException if no IP address for the {@code bindAddress} 
could be found
+      */
+     public static synchronized int findAvailablePort(String bindAddress, int 
fallbackPort) throws RuntimeException
      {
-         ServerSocket ss = null;
          try
          {
-             // let the system pick an ephemeral port
-             ss = new ServerSocket(0);
-             ss.setReuseAddress(true);
-             return ss.getLocalPort();
+             return findAvailablePort(InetAddress.getByName(bindAddress), 
fallbackPort);
+         }
+         catch (UnknownHostException e)
+         {
+             throw new RuntimeException(e);
          }
-         catch (IOException e)
+     }
+ 
+     /**
+      * Returns an available port for the given {@code bindAddress}. When an 
{@link IOException} occurs when opening a
+      * socket or if a {@link SecurityException} is raised because a manager 
exists and its checkListen method does
+      * not allow the operation, the {@code fallbackPort} is returned.
+      *
+      * @param bindAddress  the ip address for the interface where we need an 
available port number
+      * @param fallbackPort a port to return in case {@link SecurityException} 
or {@link IOException} is encountered
+      * @return an available port the given {@code bindAddress} when succeeds, 
otherwise the {@code fallbackPort}
+      */
+     public static synchronized int findAvailablePort(InetAddress bindAddress, 
int fallbackPort)
+     {
+         try (ServerSocket socket = new ServerSocket(0, 50, bindAddress))
          {
-             throw Throwables.propagate(e);
+             return socket.getLocalPort();
          }
-         finally
+         catch (SecurityException | IOException exception)
          {
-             if (ss != null)
-             {
-                 try
-                 {
-                     ss.close();
-                 }
-                 catch (IOException e)
-                 {
-                     Throwables.propagate(e);
-                 }
-             }
+             return fallbackPort;
          }
      }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to