This is an automated email from the ASF dual-hosted git repository. jonmeredith pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 208a71c00841ae9e79a2f17496beb61d42486094 Merge: 441285d58e 57293e8281 Author: Jon Meredith <jonmered...@apache.org> AuthorDate: Wed May 31 12:04:58 2023 -0600 Merge branch 'cassandra-4.0' into cassandra-4.1 build.xml | 9 +- relocate-dependencies.pom | 2 +- .../config/CassandraRelevantProperties.java | 16 ++ .../org/apache/cassandra/service/GCInspector.java | 8 +- .../org/apache/cassandra/utils/JMXServerUtils.java | 28 ++- .../org/apache/cassandra/utils/MBeanWrapper.java | 212 ++++++++++++++++++- .../utils/RMIClientSocketFactoryImpl.java | 62 ++++++ .../apache/cassandra/utils/ReflectionUtils.java | 55 +++++ .../distributed/impl/AbstractCluster.java | 12 ++ .../impl/CollectingRMIServerSocketFactoryImpl.java | 87 ++++++++ .../distributed/impl/INodeProvisionStrategy.java | 11 + .../cassandra/distributed/impl/Instance.java | 33 +++ .../cassandra/distributed/impl/InstanceConfig.java | 16 +- .../cassandra/distributed/impl/IsolatedJmx.java | 230 +++++++++++++++++++++ .../distributed/test/ResourceLeakTest.java | 62 +++++- .../distributed/test/jmx/JMXFeatureTest.java | 113 ++++++++++ .../distributed/test/jmx/JMXGetterCheckTest.java | 26 +-- .../distributed/test/metric/TableMetricTest.java | 29 ++- 18 files changed, 962 insertions(+), 49 deletions(-) diff --cc build.xml index e56c1f8e8a,f50cb9bf4d..da21569de7 --- a/build.xml +++ b/build.xml @@@ -152,8 -146,6 +152,8 @@@ <property name="chronicle-wire.version" value="2.20.117" /> <property name="chronicle-threads.version" value="2.20.111" /> - <property name="dtest-api.version" value="0.0.13" /> ++ <property name="dtest-api.version" value="0.0.15" /> + <condition property="maven-ant-tasks.jar.exists"> <available file="${build.dir}/maven-ant-tasks-${maven-ant-tasks.version}.jar" /> </condition> diff --cc src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 3e45ebc3ed,67637ac76e..75e36b4693 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@@ -226,14 -209,9 +239,17 @@@ public enum CassandraRelevantPropertie /** what class to use for mbean registeration */ MBEAN_REGISTRATION_CLASS("org.apache.cassandra.mbean_registration_class"), + /** This property indicates if the code is running under the in-jvm dtest framework */ + DTEST_IS_IN_JVM_DTEST("org.apache.cassandra.dtest.is_in_jvm_dtest"), + + BATCH_COMMIT_LOG_SYNC_INTERVAL("cassandra.batch_commitlog_sync_interval_millis", "1000"), + + SYSTEM_AUTH_DEFAULT_RF("cassandra.system_auth.default_rf", "1"), + SYSTEM_TRACES_DEFAULT_RF("cassandra.system_traces.default_rf", "2"), + SYSTEM_DISTRIBUTED_DEFAULT_RF("cassandra.system_distributed.default_rf", "3"), + + MEMTABLE_OVERHEAD_SIZE("cassandra.memtable.row_overhead_size", "-1"), + MEMTABLE_OVERHEAD_COMPUTE_STEPS("cassandra.memtable_row_overhead_computation_step", "100000"), MIGRATION_DELAY("cassandra.migration_delay_ms", "60000"), /** Defines how often schema definitions are pulled from the other nodes */ SCHEMA_PULL_INTERVAL_MS("cassandra.schema_pull_interval_ms", "60000"), diff --cc test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java index 7d333726e5,fcbef4c220..47371772b9 --- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java @@@ -183,9 -163,10 +183,11 @@@ public abstract class AbstractCluster< extends org.apache.cassandra.distributed.shared.AbstractBuilder<I, C, B> { private INodeProvisionStrategy.Strategy nodeProvisionStrategy = INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces; + private ShutdownExecutor shutdownExecutor = DEFAULT_SHUTDOWN_EXECUTOR; { + // Indicate that we are running in the in-jvm dtest environment + CassandraRelevantProperties.DTEST_IS_IN_JVM_DTEST.setBoolean(true); // those properties may be set for unit-test optimizations; those should not be used when running dtests CassandraRelevantProperties.FLUSH_LOCAL_SCHEMA_CHANGES.reset(); CassandraRelevantProperties.NON_GRACEFUL_SHUTDOWN.reset(); diff --cc test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java index 0000000000,0f16286b4d..5e67eafef6 mode 000000,100644..100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java @@@ -1,0 -1,87 +1,87 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.distributed.impl; + + import java.io.IOException; + import java.net.InetAddress; + import java.net.ServerSocket; + import java.net.SocketException; + import java.rmi.server.RMIServerSocketFactory; + import java.util.ArrayList; + import java.util.List; + import java.util.Objects; - + import javax.net.ServerSocketFactory; + ++ + /** + * This class is used to keep track of RMI servers created during a cluster creation so we can + * later close the sockets, which would otherwise be left with a thread running waiting for + * connections that would never show up as the server was otherwise closed. + */ + class CollectingRMIServerSocketFactoryImpl implements RMIServerSocketFactory + { + private final InetAddress bindAddress; + List<ServerSocket> sockets = new ArrayList<>(); + + public CollectingRMIServerSocketFactoryImpl(InetAddress bindAddress) + { + this.bindAddress = bindAddress; + } + + @Override + public ServerSocket createServerSocket(int pPort) throws IOException + { + ServerSocket result = ServerSocketFactory.getDefault().createServerSocket(pPort, 0, bindAddress); + try + { + result.setReuseAddress(true); + } + catch (SocketException e) + { + result.close(); + throw e; + } + sockets.add(result); + return result; + } + + + public void close() throws IOException + { + for (ServerSocket socket : sockets) + { + socket.close(); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CollectingRMIServerSocketFactoryImpl that = (CollectingRMIServerSocketFactoryImpl) o; + return Objects.equals(bindAddress, that.bindAddress); + } + + @Override + public int hashCode() + { + return Objects.hash(bindAddress); + } + } diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java index d4cb1cb9ed,12ef7a53d4..9975aeb9eb --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@@ -143,51 -132,47 +145,63 @@@ import org.apache.cassandra.utils.Close import org.apache.cassandra.utils.DiagnosticSnapshotService; import org.apache.cassandra.utils.ExecutorUtils; import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.JMXServerUtils; import org.apache.cassandra.utils.JVMStabilityInspector; + import org.apache.cassandra.utils.MBeanWrapper; + import org.apache.cassandra.utils.RMIClientSocketFactoryImpl; import org.apache.cassandra.utils.Throwables; -import org.apache.cassandra.utils.UUIDSerializer; import org.apache.cassandra.utils.concurrent.Ref; import org.apache.cassandra.utils.memory.BufferPools; import org.apache.cassandra.utils.progress.jmx.JMXBroadcastExecutor; import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; +import static org.apache.cassandra.distributed.api.Feature.BLANK_GOSSIP; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; + import static org.apache.cassandra.distributed.api.Feature.JMX; import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; import static org.apache.cassandra.distributed.api.Feature.NETWORK; import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.fromCassandraInetAddressAndPort; import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort; import static org.apache.cassandra.net.Verb.BATCH_STORE_REQ; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; +/** + * This class is instantiated on the relevant classloader, so its methods invoke the correct target classes automatically + */ public class Instance extends IsolatedExecutor implements IInvokableInstance { + private static final int RMI_KEEPALIVE_TIME = 1000; private Logger inInstancelogger; // Defer creation until running in the instance context public final IInstanceConfig config; private volatile boolean initialized = false; private volatile boolean internodeMessagingStarted = false; private final AtomicLong startedAt = new AtomicLong(); + private JMXConnectorServer jmxConnectorServer; + private JMXServerUtils.JmxRegistry registry; + private RMIJRMPServerImpl jmxRmiServer; + private MBeanWrapper.InstanceMBeanWrapper wrapper; + private RMIClientSocketFactoryImpl clientSocketFactory; + private CollectingRMIServerSocketFactoryImpl serverSocketFactory; + private IsolatedJmx isolatedJmx; - // should never be invoked directly, so that it is instantiated on other class loader; - // only visible for inheritance + @Deprecated Instance(IInstanceConfig config, ClassLoader classLoader) { - super("node" + config.num(), classLoader); + this(config, classLoader, null); + } + + Instance(IInstanceConfig config, ClassLoader classLoader, FileSystem fileSystem) + { + this(config, classLoader, fileSystem, null); + } + + Instance(IInstanceConfig config, ClassLoader classLoader, FileSystem fileSystem, ShutdownExecutor shutdownExecutor) + { + super("node" + config.num(), classLoader, executorFactory().pooled("isolatedExecutor", Integer.MAX_VALUE), shutdownExecutor); this.config = config; + if (fileSystem != null) + File.unsafeSetFilesystem(fileSystem); Object clusterId = Objects.requireNonNull(config.get(Constants.KEY_DTEST_API_CLUSTER_ID), "cluster_id is not defined"); ClusterIDDefiner.setId("cluster-" + clusterId); InstanceIDDefiner.setInstanceId(config.num()); @@@ -738,6 -641,20 +755,20 @@@ initialized = true; } + private void startJmx() + { - isolatedJmx = new IsolatedJmx(this, inInstancelogger); ++ this.isolatedJmx = new IsolatedJmx(this, inInstancelogger); + isolatedJmx.startJmx(); + } + - private void stopJmx() throws NoSuchFieldException, InterruptedException, IllegalAccessException ++ private void stopJmx() throws IllegalAccessException, NoSuchFieldException, InterruptedException + { + if (config.has(JMX)) + { + isolatedJmx.stopJmx(); + } + } + // Update the messaging versions for all instances // that have initialized their configurations. private static void propagateMessagingVersions(ICluster cluster) @@@ -872,25 -842,13 +903,27 @@@ // CommitLog must shut down after Stage, or threads from the latter may attempt to use the former. // (ex. A Mutation stage thread may attempt to add a mutation to the CommitLog.) error = parallelRun(error, executor, CommitLog.instance::shutdownBlocking); - error = parallelRun(error, executor, () -> shutdownAndWait(Collections.singletonList(JMXBroadcastExecutor.executor))); - + error = parallelRun(error, executor, + () -> PendingRangeCalculatorService.instance.shutdownAndWait(1L, MINUTES), + () -> shutdownAndWait(Collections.singletonList(JMXBroadcastExecutor.executor)) + ); + // ScheduledExecutors shuts down after MessagingService, as MessagingService may issue tasks to it. - error = parallelRun(error, executor, () -> ScheduledExecutors.shutdownAndWait(1L, MINUTES)); + error = parallelRun(error, executor, () -> ScheduledExecutors.shutdownNowAndWait(1L, MINUTES)); + + error = parallelRun(error, executor, this::stopJmx); + // Make sure any shutdown hooks registered for DeleteOnExit are released to prevent + // references to the instance class loaders from being held + if (graceful) + { + PathUtils.runOnExitThreadsAndClear(); + } + else + { + PathUtils.clearOnExitThreads(); + } + Throwables.maybeFail(error); }).apply(isolatedExecutor); diff --cc test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java index 92c56d6267,06229a0c01..3c515d57f0 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java @@@ -37,10 -38,17 +37,12 @@@ import org.apache.cassandra.distributed import org.apache.cassandra.distributed.upgrade.UpgradeTestBase; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.SimpleSeedProvider; -import org.apache.cassandra.utils.Shared; -@Shared public class InstanceConfig implements IInstanceConfig { - private static final Object NULL = new Object(); - private static final Logger logger = LoggerFactory.getLogger(InstanceConfig.class); - public final int num; + private final int jmxPort; + public int num() { return num; } private final NetworkTopology networkTopology; @@@ -69,9 -77,10 +71,10 @@@ String commitlog_directory, String hints_directory, String cdc_raw_directory, - String initial_token, + Collection<String> initial_token, int storage_port, - int native_transport_port) + int native_transport_port, + int jmx_port) { this.num = num; this.networkTopology = networkTopology; @@@ -107,12 -115,13 +110,13 @@@ .set("diagnostic_events_enabled", true) .set("auto_bootstrap", false) // capacities that are based on `totalMemory` that should be fixed size - .set("index_summary_capacity_in_mb", 50l) - .set("counter_cache_size_in_mb", 50l) - .set("key_cache_size_in_mb", 50l) + .set("index_summary_capacity", "50MiB") + .set("counter_cache_size", "50MiB") + .set("key_cache_size", "50MiB") // legacy parameters - .forceSet("commitlog_sync_batch_window_in_ms", 1.0); + .forceSet("commitlog_sync_batch_window_in_ms", "1"); this.featureFlags = EnumSet.noneOf(Feature.class); + this.jmxPort = jmx_port; } private InstanceConfig(InstanceConfig copy) @@@ -124,8 -133,10 +128,9 @@@ this.hostId = copy.hostId; this.featureFlags = copy.featureFlags; this.broadcastAddressAndPort = copy.broadcastAddressAndPort; + this.jmxPort = copy.jmxPort; } - @Override public InetSocketAddress broadcastAddress() { @@@ -265,14 -280,15 +276,15 @@@ String.format("%s/node%d/commitlog", root, nodeNum), String.format("%s/node%d/hints", root, nodeNum), String.format("%s/node%d/cdc", root, nodeNum), - token, + tokens, provisionStrategy.storagePort(nodeNum), - provisionStrategy.nativeTransportPort(nodeNum)); + provisionStrategy.nativeTransportPort(nodeNum), + provisionStrategy.jmxPort(nodeNum)); } - private static String[] datadirs(int datadirCount, File root, int nodeNum) + private static String[] datadirs(int datadirCount, Path root, int nodeNum) { - String datadirFormat = String.format("%s/node%d/data%%d", root.getPath(), nodeNum); + String datadirFormat = String.format("%s/node%d/data%%d", root, nodeNum); String [] datadirs = new String[datadirCount]; for (int i = 0; i < datadirs.length; i++) datadirs[i] = String.format(datadirFormat, i); diff --cc test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java index a5a2dce171,13fe696233..794873feea --- a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java @@@ -24,23 -25,30 +24,31 @@@ import java.nio.file.FileSystems import java.nio.file.Path; import java.sql.Date; import java.text.SimpleDateFormat; -import java.time.Instant; import java.util.function.Consumer; import javax.management.MBeanServer; + import javax.management.MBeanServerConnection; + import javax.management.remote.JMXConnector; - import org.apache.cassandra.io.util.File; + import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; import com.sun.management.HotSpotDiagnosticMXBean; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; + import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInstanceConfig; + import org.apache.cassandra.distributed.api.IInvokableInstance; + import org.apache.cassandra.distributed.shared.JMXUtil; ++import org.apache.cassandra.io.util.File; import org.apache.cassandra.utils.SigarLibrary; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; + import static org.apache.cassandra.distributed.api.Feature.JMX; import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.utils.FBUtilities.now; + import static org.hamcrest.Matchers.startsWith; /* Resource Leak Test - useful when tracking down issues with in-JVM framework cleanup. * All objects referencing the InstanceClassLoader need to be garbage collected or --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org