ctubbsii commented on code in PR #5811: URL: https://github.com/apache/accumulo/pull/5811#discussion_r2289446786
########## test/src/main/java/org/apache/accumulo/test/functional/ExitCodesIT_SimpleSuite.java: ########## Review Comment: Is this really a SimpleSuite test case? The SimpleSuite tests use a shared mini cluster for all tests in the suite, because they don't do anything that would mess up the cluster state that would prevent a different test from running. I haven't finished looking at this entire test, but the nature of it seems like it is intentionally changing the state (killing processes). ########## test/src/main/java/org/apache/accumulo/test/functional/ExitCodesIT_SimpleSuite.java: ########## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Stream; + +import org.apache.accumulo.compactor.Compactor; +import org.apache.accumulo.core.cli.ConfigOpts; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.gc.SimpleGarbageCollector; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; +import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.functional.ExitCodesIT_SimpleSuite.ProcessProxy.TerminalBehavior; +import org.apache.accumulo.test.util.Wait; +import org.apache.accumulo.tserver.ScanServer; +import org.apache.accumulo.tserver.TabletServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.LoggerFactory; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.modifier.Visibility; +import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy; +import net.bytebuddy.implementation.ExceptionMethod; +import net.bytebuddy.implementation.Implementation; +import net.bytebuddy.implementation.MethodCall; +import net.bytebuddy.matcher.ElementMatchers; + +@Tag(MINI_CLUSTER_ONLY) +public class ExitCodesIT_SimpleSuite extends SharedMiniClusterBase { + + public static class ServerContextFunction implements Function<SiteConfiguration,ServerContext> { + + @Override + public ServerContext apply(SiteConfiguration site) { + return new ServerContext(site); + } + + } + + // Dynamic Proxy class for a server instance that will be invoked via + // Accumulo's Main class using MiniAccumuloClusterImpl._exec(). This + // ProcessProxy class will determine which class to instantiate + // and what the terminal behavior should be using two system + // properties. A subclass of the desired class is dynamically created + // and started. + public static class ProcessProxy { + + public static enum TerminalBehavior { + SHUTDOWN, EXCEPTION, ERROR + }; + + public static final String PROXY_CLASS = "proxy.server"; + public static final String PROXY_METHOD_BEHAVIOR = "proxy.method.behavior"; + + public static void main(String[] args) throws Exception { + + final String proxyServer = System.getProperty(PROXY_CLASS); + Objects.requireNonNull(proxyServer, PROXY_CLASS + " must exist as an env var"); + final String methodBehavior = System.getProperty(PROXY_METHOD_BEHAVIOR); + Objects.requireNonNull(methodBehavior, methodBehavior + " must exist as an env var"); + + final ServerType st = ServerType.valueOf(proxyServer); + final TerminalBehavior behavior = TerminalBehavior.valueOf(methodBehavior); + + // Determine the constructor arguments and parameters for each server class. + // Find a method with no-args that does not return anything that is + // called during the servers run method that we can intercept to signal + // shutdown, exception, or error. Review Comment: Good comment... but this is wild :sweat_smile: ########## server/base/src/main/java/org/apache/accumulo/server/ServerContext.java: ########## @@ -522,9 +522,11 @@ public void close() { getMetricsInfo().close(); } if (sharedSchedExecutorCreated.get()) { + log.info("Shutting down shared executor pool"); Review Comment: Also debug here ########## server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java: ########## @@ -829,237 +829,247 @@ public void run() { getConfiguration().getTimeInMillis(Property.COMPACTOR_CANCEL_CHECK_INTERVAL)); LOG.info("Compactor started, waiting for work"); - try { - - final AtomicReference<Throwable> err = new AtomicReference<>(); - final LogSorter logSorter = new LogSorter(this); - long nextSortLogsCheckTime = System.currentTimeMillis(); - while (!isShutdownRequested()) { - if (Thread.currentThread().isInterrupted()) { - LOG.info("Server process thread has been interrupted, shutting down"); - break; - } - try { - // mark compactor as idle while not in the compaction loop - updateIdleStatus(true); + final AtomicReference<Throwable> err = new AtomicReference<>(); + final LogSorter logSorter = new LogSorter(this); + long nextSortLogsCheckTime = System.currentTimeMillis(); Review Comment: Not for this PR, but I wonder if this needs to use the system clock or can be made to use relative time (nanoTime). ########## test/src/main/java/org/apache/accumulo/test/functional/ExitCodesIT_SimpleSuite.java: ########## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Stream; + +import org.apache.accumulo.compactor.Compactor; +import org.apache.accumulo.core.cli.ConfigOpts; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.gc.SimpleGarbageCollector; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; +import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.functional.ExitCodesIT_SimpleSuite.ProcessProxy.TerminalBehavior; +import org.apache.accumulo.test.util.Wait; +import org.apache.accumulo.tserver.ScanServer; +import org.apache.accumulo.tserver.TabletServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.LoggerFactory; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.modifier.Visibility; +import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy; +import net.bytebuddy.implementation.ExceptionMethod; +import net.bytebuddy.implementation.Implementation; +import net.bytebuddy.implementation.MethodCall; +import net.bytebuddy.matcher.ElementMatchers; + +@Tag(MINI_CLUSTER_ONLY) +public class ExitCodesIT_SimpleSuite extends SharedMiniClusterBase { + + public static class ServerContextFunction implements Function<SiteConfiguration,ServerContext> { + + @Override + public ServerContext apply(SiteConfiguration site) { + return new ServerContext(site); + } + + } + + // Dynamic Proxy class for a server instance that will be invoked via + // Accumulo's Main class using MiniAccumuloClusterImpl._exec(). This + // ProcessProxy class will determine which class to instantiate + // and what the terminal behavior should be using two system + // properties. A subclass of the desired class is dynamically created + // and started. + public static class ProcessProxy { + + public static enum TerminalBehavior { + SHUTDOWN, EXCEPTION, ERROR + }; + + public static final String PROXY_CLASS = "proxy.server"; + public static final String PROXY_METHOD_BEHAVIOR = "proxy.method.behavior"; + + public static void main(String[] args) throws Exception { + + final String proxyServer = System.getProperty(PROXY_CLASS); + Objects.requireNonNull(proxyServer, PROXY_CLASS + " must exist as an env var"); + final String methodBehavior = System.getProperty(PROXY_METHOD_BEHAVIOR); + Objects.requireNonNull(methodBehavior, methodBehavior + " must exist as an env var"); + + final ServerType st = ServerType.valueOf(proxyServer); + final TerminalBehavior behavior = TerminalBehavior.valueOf(methodBehavior); + + // Determine the constructor arguments and parameters for each server class. + // Find a method with no-args that does not return anything that is + // called during the servers run method that we can intercept to signal + // shutdown, exception, or error. + final Class<? extends AbstractServer> serverClass; + final String methodName; + final Class<?>[] ctorParams; + final Object[] ctorArgs; + switch (st) { + case COMPACTOR: + List<String> compactorArgs = new ArrayList<>(); + compactorArgs.add("-o"); + compactorArgs.add(Property.COMPACTOR_GROUP_NAME.getKey() + "=TEST"); + serverClass = Compactor.class; + methodName = "updateIdleStatus"; + ctorParams = new Class<?>[] {ConfigOpts.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), compactorArgs.toArray(new String[] {})}; + break; + case GARBAGE_COLLECTOR: + serverClass = SimpleGarbageCollector.class; + methodName = "logStats"; + ctorParams = new Class<?>[] {ConfigOpts.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), new String[] {}}; + break; + case MANAGER: + serverClass = Manager.class; + methodName = "mainWait"; + ctorParams = new Class<?>[] {ConfigOpts.class, Function.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), new ServerContextFunction(), new String[] {}}; + break; + case SCAN_SERVER: + List<String> scanServerArgs = new ArrayList<>(); + scanServerArgs.add("-o"); + scanServerArgs.add(Property.SSERV_GROUP_NAME.getKey() + "=TEST"); + serverClass = ScanServer.class; + methodName = "updateIdleStatus"; + ctorParams = new Class<?>[] {ConfigOpts.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), scanServerArgs.toArray(new String[] {})}; + break; + case TABLET_SERVER: + List<String> tabletServerArgs = new ArrayList<>(); + tabletServerArgs.add("-o"); + tabletServerArgs.add(Property.TSERV_GROUP_NAME.getKey() + "=TEST"); + serverClass = TabletServer.class; + methodName = "updateIdleStatus"; + ctorParams = new Class<?>[] {ConfigOpts.class, Function.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), new ServerContextFunction(), + tabletServerArgs.toArray(new String[] {})}; + break; + case MONITOR: + case ZOOKEEPER: + default: + throw new UnsupportedOperationException(st + " is not currently supported"); + } + + final Implementation implementation; + switch (behavior) { + case ERROR: + implementation = + ExceptionMethod.throwing(StackOverflowError.class, "throwing unknown error"); + break; + case EXCEPTION: + implementation = + ExceptionMethod.throwing(RuntimeException.class, "throwing runtime exception"); + break; + case SHUTDOWN: + implementation = + MethodCall.invoke(AbstractServer.class.getMethod("requestShutdownForTests")); + break; + default: + throw new UnsupportedOperationException(behavior + " is not currently supported"); + } + + // Dynamically create the subclass with the specified behavior, load it + // into the JVM, and return it's Class. + // creates a subclass of server class defining a public constructor that + // takes an Integer, but calls the server class constructor with the + // appropriate args. Calls to the intercepted method name are intercepted + // by the supplied implementation + final Class<? extends AbstractServer> dynamicServerType = + new ByteBuddy().subclass(serverClass, ConstructorStrategy.Default.NO_CONSTRUCTORS) + .name("org.apache.accumulo.test.functional.exit_codes." + serverClass.getSimpleName() + + "_" + behavior.name()) + .defineConstructor(Visibility.PUBLIC).withParameters(Integer.class) + .intercept(MethodCall.invoke(serverClass.getDeclaredConstructor(ctorParams)).onSuper() + .with(ctorArgs)) + .method(ElementMatchers.named(methodName)).intercept(implementation).make() + .load(serverClass.getClassLoader()).getLoaded(); + + // Find and invoke the constructor of the dynamically created class + final Constructor<? extends AbstractServer> ctor = + dynamicServerType.getDeclaredConstructor(Integer.class); + final AbstractServer dynamicServerInstance = ctor.newInstance(1); + + // Start the server process + AbstractServer.startServer(dynamicServerInstance, LoggerFactory.getLogger(serverClass)); + } + + } + + @BeforeAll + public static void beforeTests() throws Exception { + // Start MiniCluster so that getCluster() does not + // return null, + SharedMiniClusterBase.startMiniCluster(); + getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); Review Comment: I don't think you can do this as part of the SimpleSuite tests. This method would only get run when running it as a standalone test. I don't think it would be called, and thus the GC wouldn't be stopped, when run as part of the suite. ########## test/src/main/java/org/apache/accumulo/test/functional/ExitCodesIT_SimpleSuite.java: ########## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Stream; + +import org.apache.accumulo.compactor.Compactor; +import org.apache.accumulo.core.cli.ConfigOpts; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.gc.SimpleGarbageCollector; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; +import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.functional.ExitCodesIT_SimpleSuite.ProcessProxy.TerminalBehavior; +import org.apache.accumulo.test.util.Wait; +import org.apache.accumulo.tserver.ScanServer; +import org.apache.accumulo.tserver.TabletServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.LoggerFactory; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.modifier.Visibility; +import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy; +import net.bytebuddy.implementation.ExceptionMethod; +import net.bytebuddy.implementation.Implementation; +import net.bytebuddy.implementation.MethodCall; +import net.bytebuddy.matcher.ElementMatchers; + +@Tag(MINI_CLUSTER_ONLY) +public class ExitCodesIT_SimpleSuite extends SharedMiniClusterBase { + + public static class ServerContextFunction implements Function<SiteConfiguration,ServerContext> { + + @Override + public ServerContext apply(SiteConfiguration site) { + return new ServerContext(site); + } + + } + + // Dynamic Proxy class for a server instance that will be invoked via + // Accumulo's Main class using MiniAccumuloClusterImpl._exec(). This + // ProcessProxy class will determine which class to instantiate + // and what the terminal behavior should be using two system + // properties. A subclass of the desired class is dynamically created + // and started. + public static class ProcessProxy { + + public static enum TerminalBehavior { + SHUTDOWN, EXCEPTION, ERROR + }; + + public static final String PROXY_CLASS = "proxy.server"; + public static final String PROXY_METHOD_BEHAVIOR = "proxy.method.behavior"; + + public static void main(String[] args) throws Exception { + + final String proxyServer = System.getProperty(PROXY_CLASS); + Objects.requireNonNull(proxyServer, PROXY_CLASS + " must exist as an env var"); + final String methodBehavior = System.getProperty(PROXY_METHOD_BEHAVIOR); + Objects.requireNonNull(methodBehavior, methodBehavior + " must exist as an env var"); + + final ServerType st = ServerType.valueOf(proxyServer); + final TerminalBehavior behavior = TerminalBehavior.valueOf(methodBehavior); + + // Determine the constructor arguments and parameters for each server class. + // Find a method with no-args that does not return anything that is + // called during the servers run method that we can intercept to signal + // shutdown, exception, or error. + final Class<? extends AbstractServer> serverClass; + final String methodName; + final Class<?>[] ctorParams; + final Object[] ctorArgs; + switch (st) { + case COMPACTOR: + List<String> compactorArgs = new ArrayList<>(); + compactorArgs.add("-o"); + compactorArgs.add(Property.COMPACTOR_GROUP_NAME.getKey() + "=TEST"); + serverClass = Compactor.class; + methodName = "updateIdleStatus"; + ctorParams = new Class<?>[] {ConfigOpts.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), compactorArgs.toArray(new String[] {})}; + break; + case GARBAGE_COLLECTOR: + serverClass = SimpleGarbageCollector.class; + methodName = "logStats"; + ctorParams = new Class<?>[] {ConfigOpts.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), new String[] {}}; + break; + case MANAGER: + serverClass = Manager.class; + methodName = "mainWait"; + ctorParams = new Class<?>[] {ConfigOpts.class, Function.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), new ServerContextFunction(), new String[] {}}; + break; + case SCAN_SERVER: + List<String> scanServerArgs = new ArrayList<>(); + scanServerArgs.add("-o"); + scanServerArgs.add(Property.SSERV_GROUP_NAME.getKey() + "=TEST"); + serverClass = ScanServer.class; + methodName = "updateIdleStatus"; + ctorParams = new Class<?>[] {ConfigOpts.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), scanServerArgs.toArray(new String[] {})}; + break; + case TABLET_SERVER: + List<String> tabletServerArgs = new ArrayList<>(); + tabletServerArgs.add("-o"); + tabletServerArgs.add(Property.TSERV_GROUP_NAME.getKey() + "=TEST"); + serverClass = TabletServer.class; + methodName = "updateIdleStatus"; + ctorParams = new Class<?>[] {ConfigOpts.class, Function.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), new ServerContextFunction(), + tabletServerArgs.toArray(new String[] {})}; + break; + case MONITOR: + case ZOOKEEPER: + default: + throw new UnsupportedOperationException(st + " is not currently supported"); + } + + final Implementation implementation; + switch (behavior) { + case ERROR: + implementation = + ExceptionMethod.throwing(StackOverflowError.class, "throwing unknown error"); + break; + case EXCEPTION: + implementation = + ExceptionMethod.throwing(RuntimeException.class, "throwing runtime exception"); + break; + case SHUTDOWN: + implementation = + MethodCall.invoke(AbstractServer.class.getMethod("requestShutdownForTests")); + break; + default: + throw new UnsupportedOperationException(behavior + " is not currently supported"); + } + + // Dynamically create the subclass with the specified behavior, load it + // into the JVM, and return it's Class. + // creates a subclass of server class defining a public constructor that + // takes an Integer, but calls the server class constructor with the + // appropriate args. Calls to the intercepted method name are intercepted + // by the supplied implementation + final Class<? extends AbstractServer> dynamicServerType = + new ByteBuddy().subclass(serverClass, ConstructorStrategy.Default.NO_CONSTRUCTORS) + .name("org.apache.accumulo.test.functional.exit_codes." + serverClass.getSimpleName() + + "_" + behavior.name()) + .defineConstructor(Visibility.PUBLIC).withParameters(Integer.class) + .intercept(MethodCall.invoke(serverClass.getDeclaredConstructor(ctorParams)).onSuper() + .with(ctorArgs)) + .method(ElementMatchers.named(methodName)).intercept(implementation).make() + .load(serverClass.getClassLoader()).getLoaded(); + + // Find and invoke the constructor of the dynamically created class + final Constructor<? extends AbstractServer> ctor = + dynamicServerType.getDeclaredConstructor(Integer.class); + final AbstractServer dynamicServerInstance = ctor.newInstance(1); + + // Start the server process + AbstractServer.startServer(dynamicServerInstance, LoggerFactory.getLogger(serverClass)); + } + + } + + @BeforeAll + public static void beforeTests() throws Exception { + // Start MiniCluster so that getCluster() does not + // return null, + SharedMiniClusterBase.startMiniCluster(); + getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); + } + + @AfterAll + public static void afterTests() { + SharedMiniClusterBase.stopMiniCluster(); + } + + // Junit doesn't support more than one parameterized + // argument to a test method. We need to generate + // the arguments here. + static Stream<Arguments> generateWorkerProcessArguments() { + List<Arguments> args = new ArrayList<>(); + for (ServerType st : ServerType.values()) { + if (st == ServerType.COMPACTOR || st == ServerType.SCAN_SERVER + || st == ServerType.TABLET_SERVER) { + for (TerminalBehavior tb : TerminalBehavior.values()) { + args.add(Arguments.of(st, tb)); + } + } + } + return args.stream(); + } + + @ParameterizedTest + @MethodSource("generateWorkerProcessArguments") + public void testWorkerProcesses(ServerType server, TerminalBehavior behavior) throws Exception { + Map<String,String> properties = new HashMap<>(); + properties.put(ProcessProxy.PROXY_CLASS, server.name()); + properties.put(ProcessProxy.PROXY_METHOD_BEHAVIOR, behavior.name()); + getCluster().getConfig().setSystemProperties(properties); + ProcessInfo pi = getCluster()._exec(ProcessProxy.class, server, Map.of(), new String[] {}); + Wait.waitFor(() -> !pi.getProcess().isAlive(), 120_000); + int exitValue = pi.getProcess().exitValue(); + if (server != ServerType.SCAN_SERVER) { + if (behavior == TerminalBehavior.SHUTDOWN) { + assertEquals(0, exitValue); + } else { + assertEquals(1, exitValue); + } Review Comment: I find this easier to read, because it's less code, but others may feel differently. ```suggestion assertEquals(behavior == TerminalBehavior.SHUTDOWN ? 0 : 1, exitValue); ``` ########## test/src/main/java/org/apache/accumulo/test/functional/ExitCodesIT_SimpleSuite.java: ########## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Stream; + +import org.apache.accumulo.compactor.Compactor; +import org.apache.accumulo.core.cli.ConfigOpts; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.gc.SimpleGarbageCollector; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; +import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.functional.ExitCodesIT_SimpleSuite.ProcessProxy.TerminalBehavior; +import org.apache.accumulo.test.util.Wait; +import org.apache.accumulo.tserver.ScanServer; +import org.apache.accumulo.tserver.TabletServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.LoggerFactory; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.modifier.Visibility; +import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy; +import net.bytebuddy.implementation.ExceptionMethod; +import net.bytebuddy.implementation.Implementation; +import net.bytebuddy.implementation.MethodCall; +import net.bytebuddy.matcher.ElementMatchers; + +@Tag(MINI_CLUSTER_ONLY) +public class ExitCodesIT_SimpleSuite extends SharedMiniClusterBase { + + public static class ServerContextFunction implements Function<SiteConfiguration,ServerContext> { + + @Override + public ServerContext apply(SiteConfiguration site) { + return new ServerContext(site); + } + + } + + // Dynamic Proxy class for a server instance that will be invoked via + // Accumulo's Main class using MiniAccumuloClusterImpl._exec(). This + // ProcessProxy class will determine which class to instantiate + // and what the terminal behavior should be using two system + // properties. A subclass of the desired class is dynamically created + // and started. + public static class ProcessProxy { + + public static enum TerminalBehavior { + SHUTDOWN, EXCEPTION, ERROR + }; + + public static final String PROXY_CLASS = "proxy.server"; + public static final String PROXY_METHOD_BEHAVIOR = "proxy.method.behavior"; + + public static void main(String[] args) throws Exception { + + final String proxyServer = System.getProperty(PROXY_CLASS); + Objects.requireNonNull(proxyServer, PROXY_CLASS + " must exist as an env var"); + final String methodBehavior = System.getProperty(PROXY_METHOD_BEHAVIOR); + Objects.requireNonNull(methodBehavior, methodBehavior + " must exist as an env var"); + + final ServerType st = ServerType.valueOf(proxyServer); + final TerminalBehavior behavior = TerminalBehavior.valueOf(methodBehavior); + + // Determine the constructor arguments and parameters for each server class. + // Find a method with no-args that does not return anything that is + // called during the servers run method that we can intercept to signal + // shutdown, exception, or error. + final Class<? extends AbstractServer> serverClass; + final String methodName; + final Class<?>[] ctorParams; + final Object[] ctorArgs; + switch (st) { + case COMPACTOR: + List<String> compactorArgs = new ArrayList<>(); + compactorArgs.add("-o"); + compactorArgs.add(Property.COMPACTOR_GROUP_NAME.getKey() + "=TEST"); + serverClass = Compactor.class; + methodName = "updateIdleStatus"; + ctorParams = new Class<?>[] {ConfigOpts.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), compactorArgs.toArray(new String[] {})}; + break; + case GARBAGE_COLLECTOR: + serverClass = SimpleGarbageCollector.class; + methodName = "logStats"; + ctorParams = new Class<?>[] {ConfigOpts.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), new String[] {}}; + break; + case MANAGER: + serverClass = Manager.class; + methodName = "mainWait"; + ctorParams = new Class<?>[] {ConfigOpts.class, Function.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), new ServerContextFunction(), new String[] {}}; + break; + case SCAN_SERVER: + List<String> scanServerArgs = new ArrayList<>(); + scanServerArgs.add("-o"); + scanServerArgs.add(Property.SSERV_GROUP_NAME.getKey() + "=TEST"); + serverClass = ScanServer.class; + methodName = "updateIdleStatus"; + ctorParams = new Class<?>[] {ConfigOpts.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), scanServerArgs.toArray(new String[] {})}; + break; + case TABLET_SERVER: + List<String> tabletServerArgs = new ArrayList<>(); + tabletServerArgs.add("-o"); + tabletServerArgs.add(Property.TSERV_GROUP_NAME.getKey() + "=TEST"); + serverClass = TabletServer.class; + methodName = "updateIdleStatus"; + ctorParams = new Class<?>[] {ConfigOpts.class, Function.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), new ServerContextFunction(), + tabletServerArgs.toArray(new String[] {})}; + break; + case MONITOR: + case ZOOKEEPER: + default: + throw new UnsupportedOperationException(st + " is not currently supported"); + } + + final Implementation implementation; + switch (behavior) { + case ERROR: + implementation = + ExceptionMethod.throwing(StackOverflowError.class, "throwing unknown error"); + break; + case EXCEPTION: + implementation = + ExceptionMethod.throwing(RuntimeException.class, "throwing runtime exception"); + break; + case SHUTDOWN: + implementation = + MethodCall.invoke(AbstractServer.class.getMethod("requestShutdownForTests")); + break; + default: + throw new UnsupportedOperationException(behavior + " is not currently supported"); + } + + // Dynamically create the subclass with the specified behavior, load it + // into the JVM, and return it's Class. + // creates a subclass of server class defining a public constructor that + // takes an Integer, but calls the server class constructor with the + // appropriate args. Calls to the intercepted method name are intercepted + // by the supplied implementation + final Class<? extends AbstractServer> dynamicServerType = + new ByteBuddy().subclass(serverClass, ConstructorStrategy.Default.NO_CONSTRUCTORS) + .name("org.apache.accumulo.test.functional.exit_codes." + serverClass.getSimpleName() + + "_" + behavior.name()) + .defineConstructor(Visibility.PUBLIC).withParameters(Integer.class) + .intercept(MethodCall.invoke(serverClass.getDeclaredConstructor(ctorParams)).onSuper() + .with(ctorArgs)) + .method(ElementMatchers.named(methodName)).intercept(implementation).make() + .load(serverClass.getClassLoader()).getLoaded(); + + // Find and invoke the constructor of the dynamically created class + final Constructor<? extends AbstractServer> ctor = + dynamicServerType.getDeclaredConstructor(Integer.class); + final AbstractServer dynamicServerInstance = ctor.newInstance(1); + + // Start the server process + AbstractServer.startServer(dynamicServerInstance, LoggerFactory.getLogger(serverClass)); + } + + } + + @BeforeAll + public static void beforeTests() throws Exception { + // Start MiniCluster so that getCluster() does not + // return null, + SharedMiniClusterBase.startMiniCluster(); + getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR); + } + + @AfterAll + public static void afterTests() { + SharedMiniClusterBase.stopMiniCluster(); + } + + // Junit doesn't support more than one parameterized + // argument to a test method. We need to generate + // the arguments here. + static Stream<Arguments> generateWorkerProcessArguments() { + List<Arguments> args = new ArrayList<>(); + for (ServerType st : ServerType.values()) { + if (st == ServerType.COMPACTOR || st == ServerType.SCAN_SERVER + || st == ServerType.TABLET_SERVER) { + for (TerminalBehavior tb : TerminalBehavior.values()) { + args.add(Arguments.of(st, tb)); + } + } + } + return args.stream(); + } + + @ParameterizedTest + @MethodSource("generateWorkerProcessArguments") + public void testWorkerProcesses(ServerType server, TerminalBehavior behavior) throws Exception { + Map<String,String> properties = new HashMap<>(); + properties.put(ProcessProxy.PROXY_CLASS, server.name()); + properties.put(ProcessProxy.PROXY_METHOD_BEHAVIOR, behavior.name()); + getCluster().getConfig().setSystemProperties(properties); + ProcessInfo pi = getCluster()._exec(ProcessProxy.class, server, Map.of(), new String[] {}); + Wait.waitFor(() -> !pi.getProcess().isAlive(), 120_000); + int exitValue = pi.getProcess().exitValue(); + if (server != ServerType.SCAN_SERVER) { Review Comment: Please swap around this if/else, so we're not checking a negative. It makes it easier to read when checking `if ==, then X, else Y` rather than `if !=, then Y, else X`. ########## core/src/main/java/org/apache/accumulo/core/lock/ServiceLockSupport.java: ########## @@ -111,11 +111,11 @@ public synchronized void failedToAcquireLock(Exception e) { String msg = "Failed to acquire " + server + " lock due to incorrect ZooKeeper authentication."; LOG.error("{} Ensure instance.secret is consistent across Accumulo configuration", msg, e); - Halt.halt(-1, msg); + Halt.halt(1, msg); } if (acquiredLock) { - Halt.halt(-1, + Halt.halt(1, Review Comment: :+1: to using positive numbers. ########## core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java: ########## @@ -918,21 +918,26 @@ public AuthenticationToken token() { @Override public synchronized void close() { if (closed.compareAndSet(false, true)) { - if (zooCacheCreated.get()) { - zooCache.get().close(); - } - if (zooKeeperOpened.get()) { - zooSession.get().close(); - } if (thriftTransportPool != null) { + log.info("Closing Thrift Transport Pool"); thriftTransportPool.shutdown(); } if (scannerReadaheadPool != null) { + log.info("Closing Scanner ReadAhead Pool"); scannerReadaheadPool.shutdownNow(); // abort all tasks, client is shutting down } if (cleanupThreadPool != null) { + log.info("Closing Cleanup ThreadPool"); cleanupThreadPool.shutdown(); // wait for shutdown tasks to execute } + if (zooCacheCreated.get()) { + log.info("Closing ZooCache"); Review Comment: I think these log messages could be at debug. They're pretty noisy, and it's not like we log at INFO when we create them. They are all internal resources. The user doesn't necessarily care to see all this internal thread cleanup for the normal case. ########## server/manager/src/main/java/org/apache/accumulo/manager/Manager.java: ########## @@ -1206,7 +1206,7 @@ boolean canSuspendTablets() { break; } try { - Thread.sleep(500); + mainWait(); } catch (InterruptedException e) { log.info("Interrupt Exception received, shutting down"); gracefulShutdown(context.rpcCreds()); Review Comment: Does this mean we get a graceful shutdown if we do Ctrl-C on the process while it's sleeping here? That doesn't seem quite right, but maybe that's the best option? I'm curious why it was done this way. ########## test/src/main/java/org/apache/accumulo/test/functional/ExitCodesIT_SimpleSuite.java: ########## @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test.functional; + +import static org.apache.accumulo.harness.AccumuloITBase.MINI_CLUSTER_ONLY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Stream; + +import org.apache.accumulo.compactor.Compactor; +import org.apache.accumulo.core.cli.ConfigOpts; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.conf.SiteConfiguration; +import org.apache.accumulo.gc.SimpleGarbageCollector; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.manager.Manager; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo; +import org.apache.accumulo.server.AbstractServer; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.test.functional.ExitCodesIT_SimpleSuite.ProcessProxy.TerminalBehavior; +import org.apache.accumulo.test.util.Wait; +import org.apache.accumulo.tserver.ScanServer; +import org.apache.accumulo.tserver.TabletServer; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.LoggerFactory; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.description.modifier.Visibility; +import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy; +import net.bytebuddy.implementation.ExceptionMethod; +import net.bytebuddy.implementation.Implementation; +import net.bytebuddy.implementation.MethodCall; +import net.bytebuddy.matcher.ElementMatchers; + +@Tag(MINI_CLUSTER_ONLY) +public class ExitCodesIT_SimpleSuite extends SharedMiniClusterBase { + + public static class ServerContextFunction implements Function<SiteConfiguration,ServerContext> { + + @Override + public ServerContext apply(SiteConfiguration site) { + return new ServerContext(site); + } + + } + + // Dynamic Proxy class for a server instance that will be invoked via + // Accumulo's Main class using MiniAccumuloClusterImpl._exec(). This + // ProcessProxy class will determine which class to instantiate + // and what the terminal behavior should be using two system + // properties. A subclass of the desired class is dynamically created + // and started. + public static class ProcessProxy { + + public static enum TerminalBehavior { + SHUTDOWN, EXCEPTION, ERROR + }; + + public static final String PROXY_CLASS = "proxy.server"; + public static final String PROXY_METHOD_BEHAVIOR = "proxy.method.behavior"; + + public static void main(String[] args) throws Exception { + + final String proxyServer = System.getProperty(PROXY_CLASS); + Objects.requireNonNull(proxyServer, PROXY_CLASS + " must exist as an env var"); + final String methodBehavior = System.getProperty(PROXY_METHOD_BEHAVIOR); + Objects.requireNonNull(methodBehavior, methodBehavior + " must exist as an env var"); + + final ServerType st = ServerType.valueOf(proxyServer); + final TerminalBehavior behavior = TerminalBehavior.valueOf(methodBehavior); + + // Determine the constructor arguments and parameters for each server class. + // Find a method with no-args that does not return anything that is + // called during the servers run method that we can intercept to signal + // shutdown, exception, or error. + final Class<? extends AbstractServer> serverClass; + final String methodName; + final Class<?>[] ctorParams; + final Object[] ctorArgs; + switch (st) { + case COMPACTOR: + List<String> compactorArgs = new ArrayList<>(); + compactorArgs.add("-o"); + compactorArgs.add(Property.COMPACTOR_GROUP_NAME.getKey() + "=TEST"); + serverClass = Compactor.class; + methodName = "updateIdleStatus"; + ctorParams = new Class<?>[] {ConfigOpts.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), compactorArgs.toArray(new String[] {})}; + break; + case GARBAGE_COLLECTOR: + serverClass = SimpleGarbageCollector.class; + methodName = "logStats"; + ctorParams = new Class<?>[] {ConfigOpts.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), new String[] {}}; + break; + case MANAGER: + serverClass = Manager.class; + methodName = "mainWait"; + ctorParams = new Class<?>[] {ConfigOpts.class, Function.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), new ServerContextFunction(), new String[] {}}; + break; + case SCAN_SERVER: + List<String> scanServerArgs = new ArrayList<>(); + scanServerArgs.add("-o"); + scanServerArgs.add(Property.SSERV_GROUP_NAME.getKey() + "=TEST"); + serverClass = ScanServer.class; + methodName = "updateIdleStatus"; + ctorParams = new Class<?>[] {ConfigOpts.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), scanServerArgs.toArray(new String[] {})}; + break; + case TABLET_SERVER: + List<String> tabletServerArgs = new ArrayList<>(); + tabletServerArgs.add("-o"); + tabletServerArgs.add(Property.TSERV_GROUP_NAME.getKey() + "=TEST"); + serverClass = TabletServer.class; + methodName = "updateIdleStatus"; + ctorParams = new Class<?>[] {ConfigOpts.class, Function.class, String[].class}; + ctorArgs = new Object[] {new ConfigOpts(), new ServerContextFunction(), Review Comment: Can't you inline the ServerContextFunction like the server processes normally do, as in: ```suggestion ctorArgs = new Object[] {new ConfigOpts(), ServerContext::new, ``` ########## server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java: ########## @@ -352,15 +352,12 @@ public void run() { gracefulShutdown(getContext().rpcCreds()); } } - super.close(); + // Must set shutdown as completed before calling super.close(). + // super.close() calls ServerContext.close() -> + // ClientContext.close() -> ZooSession.close() which removes + // all of the ephemeral nodes and forces the watches to fire. getShutdownComplete().set(true); - log.info("stop requested. exiting ... "); - try { - gcLock.unlock(); - } catch (Exception e) { - log.warn("Failed to release GarbageCollector lock", e); - } - + super.close(); Review Comment: Is that something that we always do together? If so, I wonder if it can be done inside the close, or if the getShutdownComplete() stuff is needed at all. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
