This is an automated email from the ASF dual-hosted git repository. jonmeredith pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 251ae80bb0f27ebca0b88c43d944b55dd7c34201 Merge: 5e2985f10e 6ffa43f68b Author: Jon Meredith <jonmered...@apache.org> AuthorDate: Mon Aug 28 15:29:30 2023 -0600 Merge branch 'cassandra-4.0' into cassandra-4.1 .../distributed/impl/AbstractCluster.java | 31 +++++++- .../distributed/impl/INodeProvisionStrategy.java | 89 ++++++++++++++++++---- .../cassandra/distributed/impl/InstanceConfig.java | 2 +- .../distributed/test/jmx/JMXFeatureTest.java | 3 +- .../unit/org/apache/cassandra/net/SocketUtils.java | 57 ++++++++------ 5 files changed, 139 insertions(+), 43 deletions(-) diff --cc test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java index 47371772b9,f57f2b76e2..c709cde82c --- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java @@@ -183,7 -164,7 +184,8 @@@ public abstract class AbstractCluster< extends org.apache.cassandra.distributed.shared.AbstractBuilder<I, C, B> { private INodeProvisionStrategy.Strategy nodeProvisionStrategy = INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces; + private ShutdownExecutor shutdownExecutor = DEFAULT_SHUTDOWN_EXECUTOR; + private boolean dynamicPortAllocation = false; { // Indicate that we are running in the in-jvm dtest environment @@@ -196,51 -177,35 +198,73 @@@ public AbstractBuilder(Factory<I, C, B> factory) { super(factory); + withSharedClasses(SHARED_PREDICATE); } + @SuppressWarnings("unchecked") + private B self() + { + return (B) this; + } + public B withNodeProvisionStrategy(INodeProvisionStrategy.Strategy nodeProvisionStrategy) { this.nodeProvisionStrategy = nodeProvisionStrategy; - return (B) this; + return self(); } + public B withShutdownExecutor(ShutdownExecutor shutdownExecutor) + { + this.shutdownExecutor = shutdownExecutor; - return (B) this; ++ return self(); ++ } ++ + /** + * When {@code dynamicPortAllocation} is {@code true}, it will ask {@link INodeProvisionStrategy} to provision + * available storage, native and JMX ports in the given interface. When {@code dynamicPortAllocation} is + * {@code false} (the default behavior), it will use statically allocated ports based on the number of + * interfaces available and the node number. + * + * @param dynamicPortAllocation {@code true} for dynamic port allocation, {@code false} for static port + * allocation + * @return a reference to this Builder + */ + public B withDynamicPortAllocation(boolean dynamicPortAllocation) + { + this.dynamicPortAllocation = dynamicPortAllocation; + return self(); } + + @Override + public C createWithoutStarting() throws IOException + { + // if running as vnode but test sets withoutVNodes(), then skip the test + // AbstractCluster.createInstanceConfig has similar logic, but handles the cases where the test + // attempts to control tokens via config + // when token supplier is defined, use getTokenCount() to see if vnodes is supported or not + if (isVnode()) + { + Assume.assumeTrue("vnode is not supported", isVNodeAllowed()); + // if token count > 1 and isVnode, then good + Assume.assumeTrue("no-vnode is requested but not supported", getTokenCount() > 1); + } + else + { + Assume.assumeTrue("single-token is not supported", isSingleTokenAllowed()); + // if token count == 1 and isVnode == false, then goodAbstractClusterTest + Assume.assumeTrue("vnode is requested but not supported", getTokenCount() == 1); + } + + return super.createWithoutStarting(); + } + + private boolean isVnode() + { + TokenSupplier ts = getTokenSupplier(); + return ts == null + ? getTokenCount() > 1 // token supplier wasn't defined yet, so rely on getTokenCount() + : ts.tokens(1).size() > 1; // token supplier is defined... check the first instance to see what tokens are used + } } protected class Wrapper extends DelegatingInvokableInstance implements IUpgradeableInstance @@@ -536,9 -430,11 +560,10 @@@ this.instanceMap = new ConcurrentHashMap<>(); this.initialVersion = builder.getVersion(); this.filters = new MessageFilters(); - this.instanceInitializer = builder.getInstanceInitializer(); + this.instanceInitializer = builder.getInstanceInitializer2(); this.datadirCount = builder.getDatadirCount(); + this.portMap = builder.dynamicPortAllocation ? new ConcurrentHashMap<>() : null; - int generation = GENERATION.incrementAndGet(); for (int i = 0; i < builder.getNodeCount(); ++i) { int nodeNum = i + 1; @@@ -558,46 -454,15 +583,46 @@@ return createInstanceConfig(size() + 1); } - private InstanceConfig createInstanceConfig(int nodeNum) + @VisibleForTesting + InstanceConfig createInstanceConfig(int nodeNum) { - INodeProvisionStrategy provisionStrategy = nodeProvisionStrategy.create(subnet); + INodeProvisionStrategy provisionStrategy = nodeProvisionStrategy.create(subnet, portMap); - long token = tokenSupplier.token(nodeNum); + Collection<String> tokens = tokenSupplier.tokens(nodeNum); NetworkTopology topology = buildNetworkTopology(provisionStrategy, nodeIdTopology); - InstanceConfig config = InstanceConfig.generate(nodeNum, provisionStrategy, topology, root, Long.toString(token), datadirCount); + InstanceConfig config = InstanceConfig.generate(nodeNum, provisionStrategy, topology, root, tokens, datadirCount); config.set(Constants.KEY_DTEST_API_CLUSTER_ID, clusterId.toString()); + // if a test sets num_tokens directly, then respect it and only run if vnode or no-vnode is defined + int defaultTokenCount = config.getInt("num_tokens"); + assert tokens.size() == defaultTokenCount : String.format("num_tokens=%d but tokens are %s; size does not match", defaultTokenCount, tokens); + String defaultTokens = config.getString("initial_token"); if (configUpdater != null) + { configUpdater.accept(config); + int testTokenCount = config.getInt("num_tokens"); + if (defaultTokenCount != testTokenCount) + { + if (testTokenCount == 1) + { + // test is no-vnode, but running with vnode, so skip + Assume.assumeTrue("vnode is not supported", false); + } + else + { + Assume.assumeTrue("no-vnode is requested but not supported", defaultTokenCount > 1); + // if the test controls initial_token or GOSSIP is enabled, then the test is safe to run + if (defaultTokens.equals(config.getString("initial_token"))) + { + // test didn't define initial_token + Assume.assumeTrue("vnode is enabled and num_tokens is defined in test without GOSSIP or setting initial_token", config.has(Feature.GOSSIP)); + config.remove("initial_token"); + } + else + { + // test defined initial_token; trust it + } + } + } + } return config; } diff --cc test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java index 99ef272de0,017b11de26..f2c088c0e5 --- a/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/INodeProvisionStrategy.java @@@ -18,14 -18,14 +18,18 @@@ package org.apache.cassandra.distributed.impl; + import java.util.Map; + import javax.annotation.Nullable; + + import org.apache.cassandra.net.SocketUtils; +import org.apache.cassandra.utils.Shared; +import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES; + +@Shared(inner = INTERFACES) public interface INodeProvisionStrategy { - public enum Strategy + enum Strategy { OneNetworkInterface { diff --cc test/unit/org/apache/cassandra/net/SocketUtils.java index 78a49bddbf,6a6cd559e2..7d058d8400 --- a/test/unit/org/apache/cassandra/net/SocketUtils.java +++ b/test/unit/org/apache/cassandra/net/SocketUtils.java @@@ -19,39 -19,54 +19,52 @@@ package org.apache.cassandra.net; import java.io.IOException; + import java.net.InetAddress; import java.net.ServerSocket; - - import com.google.common.base.Throwables; + import java.net.UnknownHostException; -import com.google.common.base.Throwables; - public class SocketUtils { - public static synchronized int findAvailablePort() throws RuntimeException + /** + * Returns an available port for the given {@code bindAddress}. When an {@link IOException} occurs when opening a + * socket or if a {@link SecurityException} is raised because a manager exists and its checkListen method does + * not allow the operation, the {@code fallbackPort} is returned. + * + * @param bindAddress the ip address for the interface where we need an available port number + * @param fallbackPort a port to return in case {@link SecurityException} or {@link IOException} is encountered + * @return an available port the given {@code bindAddress} when succeeds, otherwise the {@code fallbackPort} + * @throws RuntimeException if no IP address for the {@code bindAddress} could be found + */ + public static synchronized int findAvailablePort(String bindAddress, int fallbackPort) throws RuntimeException { - ServerSocket ss = null; try { - // let the system pick an ephemeral port - ss = new ServerSocket(0); - ss.setReuseAddress(true); - return ss.getLocalPort(); + return findAvailablePort(InetAddress.getByName(bindAddress), fallbackPort); + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); } - catch (IOException e) + } + + /** + * Returns an available port for the given {@code bindAddress}. When an {@link IOException} occurs when opening a + * socket or if a {@link SecurityException} is raised because a manager exists and its checkListen method does + * not allow the operation, the {@code fallbackPort} is returned. + * + * @param bindAddress the ip address for the interface where we need an available port number + * @param fallbackPort a port to return in case {@link SecurityException} or {@link IOException} is encountered + * @return an available port the given {@code bindAddress} when succeeds, otherwise the {@code fallbackPort} + */ + public static synchronized int findAvailablePort(InetAddress bindAddress, int fallbackPort) + { + try (ServerSocket socket = new ServerSocket(0, 50, bindAddress)) { - throw Throwables.propagate(e); + return socket.getLocalPort(); } - finally + catch (SecurityException | IOException exception) { - if (ss != null) - { - try - { - ss.close(); - } - catch (IOException e) - { - Throwables.propagate(e); - } - } + return fallbackPort; } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org