This is an automated email from the ASF dual-hosted git repository. jonmeredith pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push: new 43ec184391 Add support for JMX in the in-jvm dtest framework 43ec184391 is described below commit 43ec1843918aba9e81d3c2dc1433a1ef4740a51f Author: Doug Rohrer <d...@therohrers.org> AuthorDate: Tue May 30 15:01:59 2023 -0600 Add support for JMX in the in-jvm dtest framework patch by Doug Rohrer; reviewed by Alex Petrov, Jon Meredith, Francisco Guerrero Hernandez for CASSANDRA-18511 --- .build/build-resolver.xml | 2 + build.xml | 2 +- .../org/apache/cassandra/service/GCInspector.java | 11 +- .../org/apache/cassandra/utils/JMXServerUtils.java | 29 ++- .../org/apache/cassandra/utils/MBeanWrapper.java | 272 ++++++++++++++++++--- .../utils/RMIClientSocketFactoryImpl.java | 62 +++++ .../distributed/impl/AbstractCluster.java | 15 ++ .../impl/CollectingRMIServerSocketFactoryImpl.java | 87 +++++++ .../cassandra/distributed/impl/Instance.java | 26 ++ .../cassandra/distributed/impl/InstanceConfig.java | 16 +- .../distributed/impl/IsolatedExecutor.java | 2 +- .../cassandra/distributed/impl/IsolatedJmx.java | 231 +++++++++++++++++ .../distributed/test/ResourceLeakTest.java | 60 +++++ .../distributed/test/jmx/JMXFeatureTest.java | 86 +++++++ .../distributed/test/jmx/JMXGetterCheckTest.java | 138 +++++++++++ 15 files changed, 994 insertions(+), 45 deletions(-) diff --git a/.build/build-resolver.xml b/.build/build-resolver.xml index 698fb57718..99bfe4bdc7 100644 --- a/.build/build-resolver.xml +++ b/.build/build-resolver.xml @@ -51,6 +51,8 @@ <resolver:remoterepos id="all"> <remoterepo id="resolver-central" url="${artifact.remoteRepository.central}"/> <remoterepo id="resolver-apache" url="${artifact.remoteRepository.apache}"/> + <!-- Only needed for PR builds - remove before commit --> + <!-- <remoterepo id="resolver-apache-snapshot" url="https://repository.apache.org/content/repositories/snapshots" releases="false" snapshots="true" updates="always" checksums="fail" />--> </resolver:remoterepos> <resolver:resolve> diff --git a/build.xml b/build.xml index 6985c8c45a..8f26c5e8f2 100644 --- a/build.xml +++ b/build.xml @@ -383,7 +383,7 @@ <exclusion groupId="org.hamcrest" artifactId="hamcrest-core"/> </dependency> <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" scope="test"/> - <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.13" scope="test"/> + <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.15" scope="test"/> <dependency groupId="org.reflections" artifactId="reflections" version="0.10.2" scope="test"/> <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" scope="test"/> <dependency groupId="org.apache.hadoop" artifactId="hadoop-core" version="1.0.3" scope="provided"> diff --git a/src/java/org/apache/cassandra/service/GCInspector.java b/src/java/org/apache/cassandra/service/GCInspector.java index 787d79a34f..0c0f9e4704 100644 --- a/src/java/org/apache/cassandra/service/GCInspector.java +++ b/src/java/org/apache/cassandra/service/GCInspector.java @@ -137,18 +137,17 @@ public class GCInspector implements NotificationListener, GCInspectorMXBean public GCInspector() { - MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); - try { ObjectName gcName = new ObjectName(ManagementFactory.GARBAGE_COLLECTOR_MXBEAN_DOMAIN_TYPE + ",*"); - for (ObjectName name : mbs.queryNames(gcName, null)) + for (ObjectName name : MBeanWrapper.instance.queryNames(gcName, null)) { - GarbageCollectorMXBean gc = ManagementFactory.newPlatformMXBeanProxy(mbs, name.getCanonicalName(), GarbageCollectorMXBean.class); + GarbageCollectorMXBean gc = ManagementFactory.newPlatformMXBeanProxy(MBeanWrapper.instance.getMBeanServer(), name.getCanonicalName(), GarbageCollectorMXBean.class); gcStates.put(gc.getName(), new GCState(gc, assumeGCIsPartiallyConcurrent(gc), assumeGCIsOldGen(gc))); } - - MBeanWrapper.instance.registerMBean(this, new ObjectName(MBEAN_NAME)); + ObjectName me = new ObjectName(MBEAN_NAME); + if (!MBeanWrapper.instance.isRegistered(me)) + MBeanWrapper.instance.registerMBean(this, me); } catch (Exception e) { diff --git a/src/java/org/apache/cassandra/utils/JMXServerUtils.java b/src/java/org/apache/cassandra/utils/JMXServerUtils.java index fc36c6c065..aad4f0512c 100644 --- a/src/java/org/apache/cassandra/utils/JMXServerUtils.java +++ b/src/java/org/apache/cassandra/utils/JMXServerUtils.java @@ -33,6 +33,7 @@ import java.rmi.RemoteException; import java.rmi.registry.Registry; import java.rmi.server.RMIClientSocketFactory; import java.rmi.server.RMIServerSocketFactory; +import java.rmi.server.UnicastRemoteObject; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -44,6 +45,7 @@ import javax.rmi.ssl.SslRMIClientSocketFactory; import javax.rmi.ssl.SslRMIServerSocketFactory; import javax.security.auth.Subject; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -234,7 +236,8 @@ public class JMXServerUtils return env; } - private static void logJmxServiceUrl(InetAddress serverAddress, int port) + @VisibleForTesting + public static void logJmxServiceUrl(InetAddress serverAddress, int port) { String urlTemplate = "service:jmx:rmi://%1$s/jndi/rmi://%1$s:%2$d/jmxrmi"; String hostName; @@ -284,11 +287,12 @@ public class JMXServerUtils * Better to use the internal API than re-invent the wheel. */ @SuppressWarnings("restriction") - private static class JmxRegistry extends sun.rmi.registry.RegistryImpl { + public static class JmxRegistry extends sun.rmi.registry.RegistryImpl + { private final String lookupName; private Remote remoteServerStub; - JmxRegistry(final int port, + public JmxRegistry(final int port, final RMIClientSocketFactory csf, RMIServerSocketFactory ssf, final String lookupName) throws RemoteException { @@ -321,5 +325,24 @@ public class JMXServerUtils public void setRemoteServerStub(Remote remoteServerStub) { this.remoteServerStub = remoteServerStub; } + + /** + * Closes the underlying JMX registry by unexporting this instance. + * There is no reason to do this except for in-jvm dtests where we need + * to stop the registry, so we can start with a clean slate for future cluster + * builds, and the superclass never expects to be shut down and therefore doesn't + * handle this edge case at all. + */ + @VisibleForTesting + public void close() { + try + { + UnicastRemoteObject.unexportObject(this, true); + } + catch (NoSuchObjectException ignored) + { + // Ignore if it's already unexported + } + } } } diff --git a/src/java/org/apache/cassandra/utils/MBeanWrapper.java b/src/java/org/apache/cassandra/utils/MBeanWrapper.java index edee6af652..a76a2e7029 100644 --- a/src/java/org/apache/cassandra/utils/MBeanWrapper.java +++ b/src/java/org/apache/cassandra/utils/MBeanWrapper.java @@ -19,10 +19,15 @@ package org.apache.cassandra.utils; import java.lang.management.ManagementFactory; +import java.util.Collections; +import java.util.Set; +import java.util.UUID; import java.util.function.Consumer; import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import javax.management.QueryExp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,62 +38,160 @@ import org.slf4j.LoggerFactory; */ public interface MBeanWrapper { - static final Logger logger = LoggerFactory.getLogger(MBeanWrapper.class); + Logger logger = LoggerFactory.getLogger(MBeanWrapper.class); - static final MBeanWrapper instance = Boolean.getBoolean("org.apache.cassandra.disable_mbean_registration") ? - new NoOpMBeanWrapper() : - new PlatformMBeanWrapper(); + MBeanWrapper instance = create(); + String IS_DISABLED_MBEAN_REGISTRATION = "org.apache.cassandra.disable_mbean_registration"; + String DTEST_IS_IN_JVM_DTEST = "org.apache.cassandra.dtest.is_in_jvm_dtest"; + String MBEAN_REGISTRATION_CLASS = "org.apache.cassandra.mbean_registration_class"; + + static MBeanWrapper create() + { + // If we're running in the in-jvm dtest environment, always use the delegating + // mbean wrapper even if we start off with no-op, so it can be switched later + if (Boolean.getBoolean(DTEST_IS_IN_JVM_DTEST)) + { + return new DelegatingMbeanWrapper(getMBeanWrapper()); + } + + return getMBeanWrapper(); + } + + static MBeanWrapper getMBeanWrapper() + { + if (Boolean.getBoolean(IS_DISABLED_MBEAN_REGISTRATION)) + { + return new NoOpMBeanWrapper(); + } + + String klass = System.getProperty(MBEAN_REGISTRATION_CLASS); + if (klass == null) + { + if (Boolean.getBoolean(DTEST_IS_IN_JVM_DTEST)) + { + return new NoOpMBeanWrapper(); + } + else + { + return new PlatformMBeanWrapper(); + } + } + return FBUtilities.construct(klass, "mbean"); + } + + static ObjectName create(String mbeanName, OnException onException) + { + try + { + return new ObjectName(mbeanName); + } + catch (MalformedObjectNameException e) + { + onException.handler.accept(e); + return null; + } + } // Passing true for graceful will log exceptions instead of rethrowing them - public void registerMBean(Object obj, ObjectName mbeanName, OnException onException); + void registerMBean(Object obj, ObjectName mbeanName, OnException onException); + default void registerMBean(Object obj, ObjectName mbeanName) { registerMBean(obj, mbeanName, OnException.THROW); } - public void registerMBean(Object obj, String mbeanName, OnException onException); + default void registerMBean(Object obj, String mbeanName, OnException onException) + { + ObjectName name = create(mbeanName, onException); + if (name == null) + { + return; + } + registerMBean(obj, name, onException); + } + default void registerMBean(Object obj, String mbeanName) { registerMBean(obj, mbeanName, OnException.THROW); } - public boolean isRegistered(ObjectName mbeanName, OnException onException); + boolean isRegistered(ObjectName mbeanName, OnException onException); + default boolean isRegistered(ObjectName mbeanName) { return isRegistered(mbeanName, OnException.THROW); } - public boolean isRegistered(String mbeanName, OnException onException); + default boolean isRegistered(String mbeanName, OnException onException) + { + ObjectName name = create(mbeanName, onException); + if (name == null) + { + return false; + } + return isRegistered(name, onException); + } + default boolean isRegistered(String mbeanName) { return isRegistered(mbeanName, OnException.THROW); } - public void unregisterMBean(ObjectName mbeanName, OnException onException); + void unregisterMBean(ObjectName mbeanName, OnException onException); + default void unregisterMBean(ObjectName mbeanName) { unregisterMBean(mbeanName, OnException.THROW); } - public void unregisterMBean(String mbeanName, OnException onException); + default void unregisterMBean(String mbeanName, OnException onException) + { + ObjectName name = create(mbeanName, onException); + if (name == null) + { + return; + } + unregisterMBean(name, onException); + } + default void unregisterMBean(String mbeanName) { unregisterMBean(mbeanName, OnException.THROW); } - static class NoOpMBeanWrapper implements MBeanWrapper + enum OnException + { + THROW(e -> { throw new RuntimeException(e); }), + LOG(e -> { logger.error("Error in MBean wrapper: ", e); }), + IGNORE(e -> { }); + + private Consumer<Exception> handler; + OnException(Consumer<Exception> handler) + { + this.handler = handler; + } + } + + Set<ObjectName> queryNames(ObjectName name, QueryExp query); + + MBeanServer getMBeanServer(); + + class NoOpMBeanWrapper implements MBeanWrapper { public void registerMBean(Object obj, ObjectName mbeanName, OnException onException) {} public void registerMBean(Object obj, String mbeanName, OnException onException) {} - public boolean isRegistered(ObjectName mbeanName, OnException onException) { return false; } - public boolean isRegistered(String mbeanName, OnException onException) { return false; } + public boolean isRegistered(ObjectName mbeanName, OnException onException) { return false;} + public boolean isRegistered(String mbeanName, OnException onException) { return false;} public void unregisterMBean(ObjectName mbeanName, OnException onException) {} public void unregisterMBean(String mbeanName, OnException onException) {} + public Set<ObjectName> queryNames(ObjectName name, QueryExp query) {return Collections.emptySet(); } + public MBeanServer getMBeanServer() { return null; } } - static class PlatformMBeanWrapper implements MBeanWrapper + class PlatformMBeanWrapper implements MBeanWrapper { private final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + public void registerMBean(Object obj, ObjectName mbeanName, OnException onException) { try @@ -101,36 +204,70 @@ public interface MBeanWrapper } } - public void registerMBean(Object obj, String mbeanName, OnException onException) + public boolean isRegistered(ObjectName mbeanName, OnException onException) { try { - mbs.registerMBean(obj, new ObjectName(mbeanName)); + return mbs.isRegistered(mbeanName); } catch (Exception e) { onException.handler.accept(e); } + return false; } - public boolean isRegistered(ObjectName mbeanName, OnException onException) + public void unregisterMBean(ObjectName mbeanName, OnException onException) { try { - return mbs.isRegistered(mbeanName); + mbs.unregisterMBean(mbeanName); } catch (Exception e) { onException.handler.accept(e); } - return false; } - public boolean isRegistered(String mbeanName, OnException onException) + public Set<ObjectName> queryNames(ObjectName name, QueryExp query) + { + return mbs.queryNames(name, query); + } + + public MBeanServer getMBeanServer() + { + return mbs; + } + + } + + class InstanceMBeanWrapper implements MBeanWrapper + { + private MBeanServer mbs; + public final UUID id = UUID.randomUUID(); + + public InstanceMBeanWrapper(String hostname) + { + mbs = MBeanServerFactory.createMBeanServer(hostname + "-" + id); + } + + public void registerMBean(Object obj, ObjectName mbeanName, OnException onException) { try { - return mbs.isRegistered(new ObjectName(mbeanName)); + mbs.registerMBean(obj, mbeanName); + } + catch (Exception e) + { + onException.handler.accept(e); + } + } + + public boolean isRegistered(ObjectName mbeanName, OnException onException) + { + try + { + return mbs.isRegistered(mbeanName); } catch (Exception e) { @@ -151,29 +288,100 @@ public interface MBeanWrapper } } - public void unregisterMBean(String mbeanName, OnException onException) + public Set<ObjectName> queryNames(ObjectName name, QueryExp query) + { + return mbs.queryNames(name, query); + } + + public MBeanServer getMBeanServer() + { + return mbs; + } + + public void close() + { + mbs.queryNames(null, null).forEach(name -> { + try + { + if (!name.getCanonicalName().contains("MBeanServerDelegate")) + { + mbs.unregisterMBean(name); + } + } + catch (Throwable e) + { + logger.debug("Could not unregister mbean {}", name.getCanonicalName()); + } + }); + MBeanServerFactory.releaseMBeanServer(mbs); + mbs = null; + } + } + + class DelegatingMbeanWrapper implements MBeanWrapper + { + MBeanWrapper delegate; + + public DelegatingMbeanWrapper(MBeanWrapper mBeanWrapper) + { + delegate = mBeanWrapper; + } + + public MBeanWrapper getDelegate() + { + return delegate; + } + + public void setDelegate(MBeanWrapper wrapper) + { + delegate = wrapper; + } + + public void registerMBean(Object obj, ObjectName mbeanName, OnException onException) { try { - mbs.unregisterMBean(new ObjectName(mbeanName)); + delegate.registerMBean(obj, mbeanName); } catch (Exception e) { onException.handler.accept(e); } } - } - public enum OnException - { - THROW(e -> { throw new RuntimeException(e); }), - LOG(e -> { logger.error("Error in MBean wrapper: ", e); }), - IGNORE(e -> {}); + public boolean isRegistered(ObjectName mbeanName, OnException onException) + { + try + { + return delegate.isRegistered(mbeanName); + } + catch (Exception e) + { + onException.handler.accept(e); + } + return false; + } - private Consumer<Exception> handler; - OnException(Consumer<Exception> handler) + public void unregisterMBean(ObjectName mbeanName, OnException onException) { - this.handler = handler; + try + { + delegate.unregisterMBean(mbeanName); + } + catch (Exception e) + { + onException.handler.accept(e); + } + } + + public Set<ObjectName> queryNames(ObjectName name, QueryExp query) + { + return delegate.queryNames(name, query); + } + + public MBeanServer getMBeanServer() + { + return delegate.getMBeanServer(); } } } diff --git a/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java b/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java new file mode 100644 index 0000000000..62ab88fb92 --- /dev/null +++ b/src/java/org/apache/cassandra/utils/RMIClientSocketFactoryImpl.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +import java.io.IOException; +import java.io.Serializable; +import java.net.InetAddress; +import java.net.Socket; +import java.rmi.server.RMIClientSocketFactory; +import java.util.Objects; + +/** + * This class is used to override the local address the JMX client calculates when trying to connect, + * which can otherwise be influenced by the system property "java.rmi.server.hostname" in strange and + * unpredictable ways. + */ +public class RMIClientSocketFactoryImpl implements RMIClientSocketFactory, Serializable +{ + private final InetAddress localAddress; + + public RMIClientSocketFactoryImpl(InetAddress localAddress) + { + this.localAddress = localAddress; + } + + @Override + public Socket createSocket(String host, int port) throws IOException + { + return new Socket(localAddress, port); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + RMIClientSocketFactoryImpl that = (RMIClientSocketFactoryImpl) o; + return Objects.equals(localAddress, that.localAddress); + } + + @Override + public int hashCode() + { + return Objects.hash(localAddress); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java index 6f861d64cd..cffbbc461a 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java @@ -77,6 +77,8 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.SimpleCondition; import org.reflections.Reflections; +import static org.apache.cassandra.utils.MBeanWrapper.DTEST_IS_IN_JVM_DTEST; + /** * AbstractCluster creates, initializes and manages Cassandra instances ({@link Instance}. * @@ -142,6 +144,9 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I private volatile Thread.UncaughtExceptionHandler previousHandler = null; + { + System.setProperty(DTEST_IS_IN_JVM_DTEST, "true"); + } protected class Wrapper extends DelegatingInvokableInstance implements IUpgradeableInstance { private final int generation; @@ -390,6 +395,16 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster<I return instances.get(node - 1).coordinator(); } + public List<I> get(int... nodes) + { + if (nodes == null || nodes.length == 0) + throw new IllegalArgumentException("No nodes provided"); + List<I> list = new ArrayList<>(nodes.length); + for (int i : nodes) + list.add(get(i)); + return list; + } + /** * WARNING: we index from 1 here, for consistency with inet address! */ diff --git a/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java b/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java new file mode 100644 index 0000000000..5e67eafef6 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/impl/CollectingRMIServerSocketFactoryImpl.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.impl; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.SocketException; +import java.rmi.server.RMIServerSocketFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import javax.net.ServerSocketFactory; + + +/** + * This class is used to keep track of RMI servers created during a cluster creation so we can + * later close the sockets, which would otherwise be left with a thread running waiting for + * connections that would never show up as the server was otherwise closed. + */ +class CollectingRMIServerSocketFactoryImpl implements RMIServerSocketFactory +{ + private final InetAddress bindAddress; + List<ServerSocket> sockets = new ArrayList<>(); + + public CollectingRMIServerSocketFactoryImpl(InetAddress bindAddress) + { + this.bindAddress = bindAddress; + } + + @Override + public ServerSocket createServerSocket(int pPort) throws IOException + { + ServerSocket result = ServerSocketFactory.getDefault().createServerSocket(pPort, 0, bindAddress); + try + { + result.setReuseAddress(true); + } + catch (SocketException e) + { + result.close(); + throw e; + } + sockets.add(result); + return result; + } + + + public void close() throws IOException + { + for (ServerSocket socket : sockets) + { + socket.close(); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CollectingRMIServerSocketFactoryImpl that = (CollectingRMIServerSocketFactoryImpl) o; + return Objects.equals(bindAddress, that.bindAddress); + } + + @Override + public int hashCode() + { + return Objects.hash(bindAddress); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index fbef8834b4..cca6296629 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -44,6 +44,9 @@ import javax.management.ListenerNotFoundException; import javax.management.Notification; import javax.management.NotificationListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.batchlog.BatchlogManager; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.concurrent.SharedExecutorPool; @@ -123,13 +126,16 @@ import org.apache.cassandra.utils.memory.BufferPool; import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.JMX; import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; import static org.apache.cassandra.distributed.api.Feature.NETWORK; public class Instance extends IsolatedExecutor implements IInvokableInstance { + private Logger inInstancelogger; // Defer creation until running in the instance context public final IInstanceConfig config; private volatile boolean initialized = false; + private IsolatedJmx isolatedJmx; // should never be invoked directly, so that it is instantiated on other class loader; // only visible for inheritance @@ -503,6 +509,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance public void startup(ICluster cluster) { sync(() -> { + inInstancelogger = LoggerFactory.getLogger(Instance.class); try { if (config.has(GOSSIP)) @@ -518,6 +525,9 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance assert config.networkTopology().contains(config.broadcastAddress()); DistributedTestSnitch.assign(config.networkTopology()); + if (config.has(JMX)) + startJmx(); + DatabaseDescriptor.daemonInitialization(); FileUtils.setFSErrorHandler(new DefaultFSErrorHandler()); DatabaseDescriptor.createAllDirectories(); @@ -604,6 +614,20 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance initialized = true; } + private void startJmx() + { + isolatedJmx = new IsolatedJmx(this, inInstancelogger); + isolatedJmx.startJmx(); + } + + private void stopJmx() throws IllegalAccessException, NoSuchFieldException, InterruptedException + { + if (config.has(JMX)) + { + isolatedJmx.stopJmx(); + } + } + private void mkdirs() { new File(config.getString("saved_caches_directory")).mkdirs(); @@ -758,6 +782,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance CommitLog.instance::shutdownBlocking ); + error = parallelRun(error, executor, this::stopJmx); + Throwables.maybeFail(error); }).apply(isolatedExecutor); diff --git a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java index e5469625d2..7f6af4b260 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java @@ -46,6 +46,8 @@ public class InstanceConfig implements IInstanceConfig private static final Logger logger = LoggerFactory.getLogger(InstanceConfig.class); public final int num; + private final int jmxPort; + public int num() { return num; } private final NetworkTopology networkTopology; @@ -72,7 +74,8 @@ public class InstanceConfig implements IInstanceConfig String commitlog_directory, String hints_directory, String cdc_raw_directory, - String initial_token) + String initial_token, + int jmx_port) { this.num = num; this.networkTopology = networkTopology; @@ -110,6 +113,7 @@ public class InstanceConfig implements IInstanceConfig // legacy parameters .forceSet("commitlog_sync_batch_window_in_ms", 1.0); this.featureFlags = EnumSet.noneOf(Feature.class); + this.jmxPort = jmx_port; } private InstanceConfig(InstanceConfig copy) @@ -121,6 +125,7 @@ public class InstanceConfig implements IInstanceConfig this.hostId = copy.hostId; this.featureFlags = copy.featureFlags; this.broadcastAddressAndPort = copy.broadcastAddressAndPort; + this.jmxPort = copy.jmxPort; } @@ -161,6 +166,12 @@ public class InstanceConfig implements IInstanceConfig return networkTopology().localDC(broadcastAddress()); } + @Override + public int jmxPort() + { + return this.jmxPort; + } + public InstanceConfig with(Feature featureFlag) { featureFlags.add(featureFlag); @@ -251,7 +262,8 @@ public class InstanceConfig implements IInstanceConfig String.format("%s/node%d/commitlog", root, nodeNum), String.format("%s/node%d/hints", root, nodeNum), String.format("%s/node%d/cdc", root, nodeNum), - token); + token, + 7199); } private static String[] datadirs(int datadirCount, File root, int nodeNum) diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java index 53c1ad56b0..2b4fc87d3e 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java @@ -52,7 +52,7 @@ public class IsolatedExecutor implements IIsolatedExecutor { final ExecutorService isolatedExecutor; private final String name; - private final ClassLoader classLoader; + final ClassLoader classLoader; private final Method deserializeOnInstance; IsolatedExecutor(String name, ClassLoader classLoader) diff --git a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java new file mode 100644 index 0000000000..b3d06590a9 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedJmx.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.impl; + +import java.lang.reflect.Field; +import java.net.InetAddress; +import java.net.MalformedURLException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXConnectorServer; +import javax.management.remote.JMXServiceURL; +import javax.management.remote.rmi.RMIConnectorServer; +import javax.management.remote.rmi.RMIJRMPServerImpl; + +import org.slf4j.Logger; + +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.utils.JMXServerUtils; +import org.apache.cassandra.utils.MBeanWrapper; +import org.apache.cassandra.utils.RMIClientSocketFactoryImpl; +import sun.rmi.transport.tcp.TCPEndpoint; + +import static org.apache.cassandra.distributed.api.Feature.JMX; +import static org.apache.cassandra.utils.MBeanWrapper.IS_DISABLED_MBEAN_REGISTRATION; + +public class IsolatedJmx +{ + /** Controls the JMX server threadpool keap-alive time. */ + private static final String SUN_RMI_TRANSPORT_TCP_THREADKEEPALIVETIME = "sun.rmi.transport.tcp.threadKeepAliveTime"; + /** Controls the distributed garbage collector lease time for JMX objects. */ + private static final String JAVA_RMI_DGC_LEASE_VALUE_IN_JVM_DTEST ="java.rmi.dgc.leaseValue"; + private static final int RMI_KEEPALIVE_TIME = 1000; + + private JMXConnectorServer jmxConnectorServer; + private JMXServerUtils.JmxRegistry registry; + private RMIJRMPServerImpl jmxRmiServer; + private MBeanWrapper.InstanceMBeanWrapper wrapper; + private RMIClientSocketFactoryImpl clientSocketFactory; + private CollectingRMIServerSocketFactoryImpl serverSocketFactory; + private Logger inInstancelogger; + private IInstanceConfig config; + + public IsolatedJmx(Instance instance, Logger inInstancelogger) { + this.inInstancelogger = inInstancelogger; + config = instance.config(); + } + + public void startJmx() + { + try + { + // Several RMI threads hold references to in-jvm dtest objects, and are, by default, kept + // alive for long enough (minutes) to keep classloaders from being collected. + // Set these two system properties to a low value to allow cleanup to occur fast enough + // for GC to collect our classloaders. + System.setProperty(JAVA_RMI_DGC_LEASE_VALUE_IN_JVM_DTEST, String.valueOf(RMI_KEEPALIVE_TIME)); + System.setProperty(SUN_RMI_TRANSPORT_TCP_THREADKEEPALIVETIME, String.valueOf(RMI_KEEPALIVE_TIME)); + System.setProperty(IS_DISABLED_MBEAN_REGISTRATION, "false"); + InetAddress addr = config.broadcastAddress().getAddress(); + + int jmxPort = config.jmxPort(); + + String hostname = addr.getHostAddress(); + wrapper = new MBeanWrapper.InstanceMBeanWrapper(hostname + ":" + jmxPort); + ((MBeanWrapper.DelegatingMbeanWrapper) MBeanWrapper.instance).setDelegate(wrapper); + Map<String, Object> env = new HashMap<>(); + + serverSocketFactory = new CollectingRMIServerSocketFactoryImpl(addr); + env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE, + serverSocketFactory); + clientSocketFactory = new RMIClientSocketFactoryImpl(addr); + env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE, + clientSocketFactory); + + // configure the RMI registry + registry = new JMXServerUtils.JmxRegistry(jmxPort, + clientSocketFactory, + serverSocketFactory, + "jmxrmi"); + + // Mark the JMX server as a permanently exported object. This allows the JVM to exit with the + // server running and also exempts it from the distributed GC scheduler which otherwise would + // potentially attempt a full GC every `sun.rmi.dgc.server.gcInterval` millis (default is 3600000ms) + // For more background see: + // - CASSANDRA-2967 + // - https://www.jclarity.com/2015/01/27/rmi-system-gc-unplugged/ + // - https://bugs.openjdk.java.net/browse/JDK-6760712 + env.put("jmx.remote.x.daemon", "true"); + + // Set the port used to create subsequent connections to exported objects over RMI. This simplifies + // configuration in firewalled environments, but it can't be used in conjuction with SSL sockets. + // See: CASSANDRA-7087 + int rmiPort = config.jmxPort(); + + // We create the underlying RMIJRMPServerImpl so that we can manually bind it to the registry, + // rather then specifying a binding address in the JMXServiceURL and letting it be done automatically + // when the server is started. The reason for this is that if the registry is configured with SSL + // sockets, the JMXConnectorServer acts as its client during the binding which means it needs to + // have a truststore configured which contains the registry's certificate. Manually binding removes + // this problem. + // See CASSANDRA-12109. + jmxRmiServer = new RMIJRMPServerImpl(rmiPort, clientSocketFactory, serverSocketFactory, + env); + JMXServiceURL serviceURL = new JMXServiceURL("rmi", hostname, rmiPort); + jmxConnectorServer = new RMIConnectorServer(serviceURL, env, jmxRmiServer, wrapper.getMBeanServer()); + + jmxConnectorServer.start(); + + registry.setRemoteServerStub(jmxRmiServer.toStub()); + JMXServerUtils.logJmxServiceUrl(addr, jmxPort); + waitForJmxAvailability(hostname, jmxPort, env); + } + catch (Throwable e) + { + throw new RuntimeException("Feature.JMX was enabled but could not be started.", e); + } + } + + + private void waitForJmxAvailability(String hostname, int rmiPort, Map<String, Object> env) throws InterruptedException, MalformedURLException + { + String url = String.format("service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi", hostname, rmiPort); + JMXServiceURL serviceURL = new JMXServiceURL(url); + int attempts = 0; + Throwable lastThrown = null; + while (attempts < 20) + { + attempts++; + try (JMXConnector ignored = JMXConnectorFactory.connect(serviceURL, env)) + { + inInstancelogger.info("Connected to JMX server at {} after {} attempt(s)", + url, attempts); + return; + } + catch (MalformedURLException e) + { + throw new RuntimeException(e); + } + catch (Throwable thrown) + { + lastThrown = thrown; + } + inInstancelogger.info("Could not connect to JMX on {} after {} attempts. Will retry.", url, attempts); + Thread.sleep(1000); + } + throw new RuntimeException("Could not start JMX - unreachable after 20 attempts", lastThrown); + } + + public void stopJmx() throws IllegalAccessException, NoSuchFieldException, InterruptedException + { + if (!config.has(JMX)) + return; + // First, swap the mbean wrapper back to a NoOp wrapper + // This prevents later attempts to unregister mbeans from failing in Cassandra code, as we're going to + // unregister all of them here + ((MBeanWrapper.DelegatingMbeanWrapper) MBeanWrapper.instance).setDelegate(new MBeanWrapper.NoOpMBeanWrapper()); + try + { + wrapper.close(); + } + catch (Throwable e) + { + inInstancelogger.warn("failed to close wrapper.", e); + } + try + { + jmxConnectorServer.stop(); + } + catch (Throwable e) + { + inInstancelogger.warn("failed to close jmxConnectorServer.", e); + } + try + { + registry.close(); + } + catch (Throwable e) + { + inInstancelogger.warn("failed to close registry.", e); + } + try + { + serverSocketFactory.close(); + } + catch (Throwable e) + { + inInstancelogger.warn("failed to close serverSocketFactory.", e); + } + // The TCPEndpoint class holds references to a class in the in-jvm dtest framework + // which transitively has a reference to the InstanceClassLoader, so we need to + // make sure to remove the reference to them when the instance is shutting down + clearMapField(TCPEndpoint.class, null, "localEndpoints"); + Thread.sleep(2 * RMI_KEEPALIVE_TIME); // Double the keep-alive time to give Distributed GC some time to clean up + } + + private <K, V> void clearMapField(Class<?> clazz, Object instance, String mapName) + throws IllegalAccessException, NoSuchFieldException { + Field mapField = clazz.getDeclaredField(mapName); + mapField.setAccessible(true); + Map<K, V> map = (Map<K, V>) mapField.get(instance); + // Because multiple instances can be shutting down at once, + // synchronize on the map to avoid ConcurrentModificationException + synchronized (map) + { + for (Iterator<Map.Entry<K, V>> it = map.entrySet().iterator(); it.hasNext(); ) + { + it.next(); + it.remove(); + } + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java index 1c4850a56b..177cbb9566 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ResourceLeakTest.java @@ -28,7 +28,10 @@ import java.text.SimpleDateFormat; import java.time.Instant; import java.util.function.Consumer; import javax.management.MBeanServer; +import javax.management.MBeanServerConnection; +import javax.management.remote.JMXConnector; +import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -36,14 +39,19 @@ import com.sun.management.HotSpotDiagnosticMXBean; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.shared.JMXUtil; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.SigarLibrary; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.JMX; import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL; import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.hamcrest.Matchers.startsWith; /* Resource Leak Test - useful when tracking down issues with in-JVM framework cleanup. * All objects referencing the InstanceClassLoader need to be garbage collected or @@ -141,6 +149,11 @@ public class ResourceLeakTest extends TestBaseImpl } void doTest(int numClusterNodes, Consumer<IInstanceConfig> updater) throws Throwable + { + doTest(numClusterNodes, updater, ignored -> {}); + } + + void doTest(int numClusterNodes, Consumer<IInstanceConfig> updater, Consumer<Cluster> actionToPerform) throws Throwable { for (int loop = 0; loop < numTestLoops; loop++) { @@ -155,6 +168,7 @@ public class ResourceLeakTest extends TestBaseImpl cluster.schemaChange("CREATE TABLE " + KEYSPACE + "." + tableName + " (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + "." + tableName + "(pk,ck,v) VALUES (0,0,0)", ConsistencyLevel.ALL); cluster.get(1).callOnInstance(() -> FBUtilities.waitOnFutures(Keyspace.open(KEYSPACE).flush())); + actionToPerform.accept(cluster); if (dumpEveryLoop) { dumpResources(String.format("loop%03d", loop)); @@ -213,4 +227,50 @@ public class ResourceLeakTest extends TestBaseImpl } dumpResources("final-native"); } + + @Test + public void looperJmxTest() throws Throwable + { + doTest(1, config -> config.with(JMX), cluster -> { + // NOTE: At some point, the hostname of the broadcastAddress can be resolved + // and then the `getHostString`, which would otherwise return the IP address, + // starts returning `localhost` - use `.getAddress().getHostAddress()` to work around this. + for (IInvokableInstance instance:cluster.get(1, cluster.size())) + { + IInstanceConfig config = instance.config(); + try (JMXConnector jmxc = JMXUtil.getJmxConnector(config)) + { + MBeanServerConnection mbsc = jmxc.getMBeanServerConnection(); + // instances get their default domain set to their IP address, so us it + // to check that we are actually connecting to the correct instance + String defaultDomain = mbsc.getDefaultDomain(); + Assert.assertThat(defaultDomain, startsWith(JMXUtil.getJmxHost(config) + ":" + config.jmxPort())); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + }); + if (forceCollection) + { + System.runFinalization(); + System.gc(); + Thread.sleep(finalWaitMillis); + } + dumpResources("final-jmx"); + } + + @Test + public void looperEverythingTest() throws Throwable + { + doTest(1, config -> config.with(Feature.values())); + if (forceCollection) + { + System.runFinalization(); + System.gc(); + Thread.sleep(finalWaitMillis); + } + dumpResources("final-everything"); + } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java new file mode 100644 index 0000000000..1c38bd11e9 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXFeatureTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.jmx; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import javax.management.MBeanServerConnection; +import javax.management.remote.JMXConnector; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.shared.JMXUtil; +import org.apache.cassandra.distributed.test.TestBaseImpl; + +import static org.hamcrest.Matchers.startsWith; + +public class JMXFeatureTest extends TestBaseImpl +{ + /** + * Test the in-jvm dtest JMX feature. + * - Create a cluster with multiple JMX servers, one per instance + * - Test that when connecting, we get the correct MBeanServer by checking the default domain, which is set to the IP of the instance + * - Run the test multiple times to ensure cleanup of the JMX servers is complete so the next test can run successfully using the same host/port. + * NOTE: In later versions of Cassandra, there is also a `testOneNetworkInterfaceProvisioning` that leverages the ability to specify + * ports in addition to IP/Host for binding, but this version does not support that feature. Keeping the test name the same + * so that it's consistent across versions. + * + * @throws Exception + */ + @Test + public void testMultipleNetworkInterfacesProvisioning() throws Exception + { + int iterations = 2; // Make sure the JMX infrastructure all cleans up properly by running this multiple times. + Set<String> allInstances = new HashSet<>(); + for (int i = 0; i < iterations; i++) + { + try (Cluster cluster = Cluster.build(2).withConfig(c -> c.with(Feature.values())).start()) + { + Set<String> instancesContacted = new HashSet<>(); + for (IInvokableInstance instance : cluster.get(1, 2)) + { + testInstance(instancesContacted, instance); + } + Assert.assertEquals("Should have connected with both JMX instances.", 2, instancesContacted.size()); + allInstances.addAll(instancesContacted); + } + } + Assert.assertEquals("Each instance from each cluster should have been unique", iterations * 2, allInstances.size()); + } + + private void testInstance(Set<String> instancesContacted, IInvokableInstance instance) throws IOException + { + IInstanceConfig config = instance.config(); + try (JMXConnector jmxc = JMXUtil.getJmxConnector(config)) + { + MBeanServerConnection mbsc = jmxc.getMBeanServerConnection(); + // instances get their default domain set to their IP address, so us it + // to check that we are actually connecting to the correct instance + String defaultDomain = mbsc.getDefaultDomain(); + instancesContacted.add(defaultDomain); + Assert.assertThat(defaultDomain, startsWith(JMXUtil.getJmxHost(config) + ":" + config.jmxPort())); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java new file mode 100644 index 0000000000..a3f74df60d --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/jmx/JMXGetterCheckTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.distributed.test.jmx; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import javax.management.JMRuntimeException; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanInfo; +import javax.management.MBeanOperationInfo; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; + +import com.google.common.collect.ImmutableSet; +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.test.TestBaseImpl; + +public class JMXGetterCheckTest extends TestBaseImpl +{ + private static final Set<String> IGNORE_ATTRIBUTES = ImmutableSet.of( + "org.apache.cassandra.net:type=MessagingService:BackPressurePerHost" // throws unsupported saying the feature was removed... dropped in CASSANDRA-15375 + ); + private static final Set<String> IGNORE_OPERATIONS = ImmutableSet.of( + "org.apache.cassandra.db:type=StorageService:stopDaemon", // halts the instance, which then causes the JVM to exit + "org.apache.cassandra.db:type=StorageService:drain", // don't drain, it stops things which can cause other APIs to be unstable as we are in a stopped state + "org.apache.cassandra.db:type=StorageService:stopGossiping", // if we stop gossip this can cause other issues, so avoid + "org.apache.cassandra.db:type=StorageService:resetLocalSchema", // this will fail when there are no other nodes which can serve schema + "org.apache.cassandra.db:type=HintedHandoffManager:listEndpointsPendingHints", // this will fail because it only exists to match an old, deprecated mbean and just throws an UnsportedOperationException + "org.apache.cassandra.db:type=StorageService:decommission" // Don't decommission nodes! Note that in future versions of C* this is unnecessary because decommission takes an argument. + ); + + public static final String JMX_SERVICE_URL_FMT = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi"; + + @Test + public void testGetters() throws Exception + { + try (Cluster cluster = Cluster.build(1).withConfig(c -> c.with(Feature.values())).start()) + { + IInvokableInstance instance = cluster.get(1); + + String jmxHost = instance.config().broadcastAddress().getAddress().getHostAddress(); + String url = String.format(JMX_SERVICE_URL_FMT, jmxHost, instance.config().jmxPort()); + List<Named> errors = new ArrayList<>(); + try (JMXConnector jmxc = JMXConnectorFactory.connect(new JMXServiceURL(url), null)) + { + MBeanServerConnection mbsc = jmxc.getMBeanServerConnection(); + Set<ObjectName> metricNames = new TreeSet<>(mbsc.queryNames(null, null)); + for (ObjectName name : metricNames) + { + if (!name.getDomain().startsWith("org.apache.cassandra")) + continue; + MBeanInfo info = mbsc.getMBeanInfo(name); + for (MBeanAttributeInfo a : info.getAttributes()) + { + String fqn = String.format("%s:%s", name, a.getName()); + if (!a.isReadable() || IGNORE_ATTRIBUTES.contains(fqn)) + continue; + try + { + mbsc.getAttribute(name, a.getName()); + } + catch (JMRuntimeException e) + { + errors.add(new Named(String.format("Attribute %s", fqn), e.getCause())); + } + } + + for (MBeanOperationInfo o : info.getOperations()) + { + String fqn = String.format("%s:%s", name, o.getName()); + if (o.getSignature().length != 0 || IGNORE_OPERATIONS.contains(fqn)) + continue; + try + { + mbsc.invoke(name, o.getName(), new Object[0], new String[0]); + } + catch (JMRuntimeException e) + { + errors.add(new Named(String.format("Operation %s", fqn), e.getCause())); + } + } + } + } + if (!errors.isEmpty()) + { + AssertionError root = new AssertionError(); + errors.forEach(root::addSuppressed); + throw root; + } + } + } + + /** + * This class is meant to make new errors easier to read, by adding the JMX endpoint, and cleaning up the unneeded JMX/Reflection logic cluttering the stacktrace + */ + private static class Named extends RuntimeException + { + public Named(String msg, Throwable cause) + { + super(msg + "\nCaused by: " + cause.getClass().getCanonicalName() + ": " + cause.getMessage(), cause.getCause()); + StackTraceElement[] stack = cause.getStackTrace(); + List<StackTraceElement> copy = new ArrayList<>(); + for (StackTraceElement s : stack) + { + if (!s.getClassName().startsWith("org.apache.cassandra")) + break; + copy.add(s); + } + Collections.reverse(copy); + setStackTrace(copy.toArray(new StackTraceElement[0])); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org