This is an automated email from the ASF dual-hosted git repository. pdallig pushed a commit to branch surefire in repository https://gitbox.apache.org/repos/asf/zeppelin.git
commit a0aa4fb9ac796b4371472758dc17ceec88765aa4 Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Fri Mar 11 17:30:50 2022 +0100 Try to solve surefire update --- .../zeppelin/server/ImmediateErrorHandlerImpl.java | 14 +- .../org/apache/zeppelin/server/ZeppelinServer.java | 141 +++++++++++---------- .../zeppelin/socket/SessionConfigurator.java | 3 +- .../java/org/apache/zeppelin/utils/TestUtils.java | 4 +- .../zeppelin/cluster/ZeppelinServerMock.java | 3 +- .../org/apache/zeppelin/recovery/RecoveryTest.java | 3 +- .../apache/zeppelin/rest/AbstractTestRestApi.java | 8 +- .../apache/zeppelin/rest/NotebookRestApiTest.java | 4 +- 8 files changed, 93 insertions(+), 87 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ImmediateErrorHandlerImpl.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ImmediateErrorHandlerImpl.java index 5d0ec17..27f4e6d 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ImmediateErrorHandlerImpl.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ImmediateErrorHandlerImpl.java @@ -33,25 +33,25 @@ public class ImmediateErrorHandlerImpl implements ImmediateErrorHandler { @Override public void postConstructFailed(ActiveDescriptor<?> immediateService, Throwable exception) { - synchronized (this) { + synchronized (constructionErrors) { constructionErrors.add(new ErrorData(immediateService, exception)); - this.notifyAll(); + constructionErrors.notifyAll(); } } @Override public void preDestroyFailed(ActiveDescriptor<?> immediateService, Throwable exception) { - synchronized (this) { + synchronized (constructionErrors) { destructionErrors.add(new ErrorData(immediateService, exception)); - this.notifyAll(); + constructionErrors.notifyAll(); } } List<ErrorData> waitForAtLeastOneConstructionError(long waitTime) throws InterruptedException { - synchronized (this) { - while (constructionErrors.size() <= 0 && waitTime > 0) { + synchronized (constructionErrors) { + while (constructionErrors.isEmpty() && waitTime > 0) { long currentTime = System.currentTimeMillis(); - wait(waitTime); + constructionErrors.wait(waitTime); long elapsedTime = System.currentTimeMillis() - currentTime; waitTime -= elapsedTime; } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 428cdd9..836257f 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -135,30 +135,26 @@ public class ZeppelinServer extends ResourceConfig { private static final Logger LOG = LoggerFactory.getLogger(ZeppelinServer.class); private static final String WEB_APP_CONTEXT_NEXT = "/next"; - public static Server jettyWebServer; - public static ServiceLocator sharedServiceLocator; - - private static ZeppelinConfiguration conf; - private static PrometheusMeterRegistry promMetricRegistry; - - public static void reset() { - conf = null; - jettyWebServer = null; - sharedServiceLocator = null; - } + public static final String SERVICE_LOCATOR_NAME= "shared-locator"; @Inject - public ZeppelinServer() { + public ZeppelinServer(ZeppelinConfiguration conf) { LOG.info("Instantiated ZeppelinServer"); InterpreterOutput.LIMIT = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT); packages("org.apache.zeppelin.rest"); } - public static void main(String[] args) throws InterruptedException, IOException { - ZeppelinServer.conf = ZeppelinConfiguration.create(); + public static void main(String[] args) throws IOException { + ZeppelinConfiguration conf = ZeppelinConfiguration.create(); - jettyWebServer = setupJettyServer(conf); + Server jettyWebServer = setupJettyServer(conf); + + PrometheusMeterRegistry promMetricRegistry = null; + if (conf.isPrometheusMetricEnabled()) { + promMetricRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); + Metrics.addRegistry(promMetricRegistry); + } initMetrics(conf); TimedHandler timedHandler = new TimedHandler(Metrics.globalRegistry, Tags.empty()); @@ -167,7 +163,7 @@ public class ZeppelinServer extends ResourceConfig { ContextHandlerCollection contexts = new ContextHandlerCollection(); timedHandler.setHandler(contexts); - sharedServiceLocator = ServiceLocatorFactory.getInstance().create("shared-locator"); + ServiceLocator sharedServiceLocator = ServiceLocatorFactory.getInstance().create(SERVICE_LOCATOR_NAME); ServiceLocatorUtilities.enableImmediateScope(sharedServiceLocator); ServiceLocatorUtilities.addClasses(sharedServiceLocator, NotebookRepoSync.class, @@ -230,10 +226,10 @@ public class ZeppelinServer extends ResourceConfig { final WebAppContext defaultWebApp = setupWebAppContext(contexts, conf, conf.getString(ConfVars.ZEPPELIN_WAR), conf.getServerContextPath()); final WebAppContext nextWebApp = setupWebAppContext(contexts, conf, conf.getString(ConfVars.ZEPPELIN_ANGULAR_WAR), WEB_APP_CONTEXT_NEXT); - initWebApp(defaultWebApp); - initWebApp(nextWebApp); + initWebApp(defaultWebApp, conf, sharedServiceLocator, promMetricRegistry); + initWebApp(nextWebApp, conf, sharedServiceLocator, promMetricRegistry); // Cluster Manager Server - setupClusterManagerServer(sharedServiceLocator); + setupClusterManagerServer(sharedServiceLocator, conf); // JMX Enable if (conf.isJMXEnabled()) { @@ -251,25 +247,9 @@ public class ZeppelinServer extends ResourceConfig { LOG.info("JMX Enabled with port: {}", port); } - LOG.info("Starting zeppelin server"); - try { - jettyWebServer.start(); // Instantiates ZeppelinServer - List<ErrorData> errorData = handler.waitForAtLeastOneConstructionError(5 * 1000); - if(errorData.size() > 0 && errorData.get(0).getThrowable() != null) { - throw new Exception(errorData.get(0).getThrowable()); - } - if (conf.getJettyName() != null) { - org.eclipse.jetty.http.HttpGenerator.setJettyVersion(conf.getJettyName()); - } - } catch (Exception e) { - LOG.error("Error while running jettyServer", e); - System.exit(-1); - } - LOG.info("Done, zeppelin server started"); - - runNoteOnStart(conf); + runNoteOnStart(conf, jettyWebServer, sharedServiceLocator); - Runtime.getRuntime().addShutdownHook(shutdown(conf)); + Runtime.getRuntime().addShutdownHook(shutdown(conf, jettyWebServer, sharedServiceLocator)); // Try to get Notebook from ServiceLocator, because Notebook instantiation is lazy, it is // created when user open zeppelin in browser if we don't get it explicitly here. @@ -283,28 +263,47 @@ public class ZeppelinServer extends ResourceConfig { // Try to recover here, don't do it in constructor of Notebook, because it would cause deadlock. notebook.recoveryIfNecessary(); - // when zeppelin is started inside of ide (especially for eclipse) - // for graceful shutdown, input any key in console window - if (System.getenv("ZEPPELIN_IDENT_STRING") == null) { - try { - System.in.read(); - } catch (IOException e) { - LOG.error("Exception in ZeppelinServer while main ", e); + + try { + List<ErrorData> errorDatas = handler.waitForAtLeastOneConstructionError(5000); + for (ErrorData errorData : errorDatas) { + LOG.error("Error in Construction", errorData.getThrowable()); + } + if (!errorDatas.isEmpty()) { + LOG.error("{} error(s) while starting - Termination", errorDatas.size()); + System.exit(-1); + } + } catch (InterruptedException e) { + LOG.error("Error while Constrcut Services", e); + Thread.currentThread().interrupt(); + System.exit(-1); + } + + + LOG.info("Starting zeppelin server"); + try { + jettyWebServer.start(); // Instantiates ZeppelinServer + if (conf.getJettyName() != null) { + org.eclipse.jetty.http.HttpGenerator.setJettyVersion(conf.getJettyName()); } - System.exit(0); + } catch (Exception e) { + LOG.error("Error while running jettyServer", e); + System.exit(-1); } + LOG.info("Done, zeppelin server started"); - jettyWebServer.join(); + try { + jettyWebServer.join(); + } catch (InterruptedException e) { + shutdown(conf, jettyWebServer, sharedServiceLocator).start(); + Thread.currentThread().interrupt(); + } if (!conf.isRecoveryEnabled()) { sharedServiceLocator.getService(InterpreterSettingManager.class).close(); } } private static void initMetrics(ZeppelinConfiguration conf) { - if (conf.isPrometheusMetricEnabled()) { - promMetricRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); - Metrics.addRegistry(promMetricRegistry); - } if (conf.isJMXEnabled()) { Metrics.addRegistry(new JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM)); } @@ -317,7 +316,7 @@ public class ZeppelinServer extends ResourceConfig { new JVMInfoBinder().bindTo(Metrics.globalRegistry); } - private static Thread shutdown(ZeppelinConfiguration conf) { + private static Thread shutdown(ZeppelinConfiguration conf, Server jettyWebServer, ServiceLocator sharedServiceLocator) { return new Thread( () -> { LOG.info("Shutting down Zeppelin Server ... "); @@ -346,10 +345,10 @@ public class ZeppelinServer extends ResourceConfig { conf.getInt(ConfVars.ZEPPELIN_SERVER_JETTY_THREAD_POOL_MIN), conf.getInt(ConfVars.ZEPPELIN_SERVER_JETTY_THREAD_POOL_TIMEOUT)); final Server server = new Server(threadPool); - initServerConnector(server, conf.getServerPort(), conf.getServerSslPort()); + initServerConnector(server, conf); return server; } - private static void initServerConnector(Server server, int port, int sslPort) { + private static void initServerConnector(Server server, ZeppelinConfiguration conf) { ServerConnector connector; HttpConfiguration httpConfig = new HttpConfiguration(); @@ -357,9 +356,9 @@ public class ZeppelinServer extends ResourceConfig { httpConfig.setSendServerVersion(conf.sendJettyName()); httpConfig.setRequestHeaderSize(conf.getJettyRequestHeaderSize()); if (conf.useSsl()) { - LOG.debug("Enabling SSL for Zeppelin Server on port {}", sslPort); + LOG.debug("Enabling SSL for Zeppelin Server on port {}", conf.getServerSslPort()); httpConfig.setSecureScheme(HttpScheme.HTTPS.asString()); - httpConfig.setSecurePort(sslPort); + httpConfig.setSecurePort(conf.getServerSslPort()); HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig); httpsConfig.addCustomizer(new SecureRequestCustomizer()); @@ -371,7 +370,7 @@ public class ZeppelinServer extends ResourceConfig { server, sslConnectionFactory, httpsConnectionFactory); - connector.setPort(sslPort); + connector.setPort(conf.getServerSslPort()); connector.addBean(new JettySslHandshakeMetrics(Metrics.globalRegistry, Tags.empty())); } else { HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(httpConfig); @@ -379,7 +378,7 @@ public class ZeppelinServer extends ResourceConfig { new ServerConnector( server, httpConnectionFactory); - connector.setPort(port); + connector.setPort(conf.getServerPort()); } // Set some timeout options to make debugging easier. int timeout = 1000 * 30; @@ -389,7 +388,7 @@ public class ZeppelinServer extends ResourceConfig { server.addConnector(connector); } - private static void runNoteOnStart(ZeppelinConfiguration conf) throws IOException, InterruptedException { + private static void runNoteOnStart(ZeppelinConfiguration conf, Server jettyWebServer, ServiceLocator sharedServiceLocator) throws IOException { String noteIdToRun = conf.getNotebookRunId(); if (!StringUtils.isEmpty(noteIdToRun)) { LOG.info("Running note {} on start", noteIdToRun); @@ -422,16 +421,20 @@ public class ZeppelinServer extends ResourceConfig { }); if (conf.getNotebookRunAutoShutdown()) { - Thread t = shutdown(conf); + Thread t = shutdown(conf, jettyWebServer, sharedServiceLocator); t.start(); - t.join(); + try { + t.join(); + } catch (InterruptedException e) { + LOG.error("Interrupt during shutdown - immediate termination"); + System.exit(1); + } System.exit(success ? 0 : 1); } } } - private static void setupNotebookServer( - WebAppContext webapp, ZeppelinConfiguration conf, ServiceLocator serviceLocator) { + private static void setupNotebookServer(WebAppContext webapp, ZeppelinConfiguration conf) { String maxTextMessageSize = conf.getWebsocketMaxTextMessageSize(); WebSocketServerContainerInitializer .configure(webapp, (servletContext, wsContainer) -> { @@ -440,7 +443,7 @@ public class ZeppelinServer extends ResourceConfig { }); } - private static void setupClusterManagerServer(ServiceLocator serviceLocator) { + private static void setupClusterManagerServer(ServiceLocator serviceLocator, ZeppelinConfiguration conf) { if (conf.isClusterMode()) { LOG.info("Cluster mode is enabled, starting ClusterManagerServer"); ClusterManagerServer clusterManagerServer = ClusterManagerServer.getInstance(conf); @@ -458,7 +461,7 @@ public class ZeppelinServer extends ResourceConfig { // when the zeppelin service starts, Create a ClusterInterpreterLauncher object, // This allows the ClusterInterpreterLauncher to listen for cluster events. try { - InterpreterSettingManager intpSettingManager = sharedServiceLocator.getService(InterpreterSettingManager.class); + InterpreterSettingManager intpSettingManager = serviceLocator.getService(InterpreterSettingManager.class); RecoveryStorage recoveryStorage = ReflectionUtils.createClazzInstance( conf.getRecoveryStorageClass(), new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class}, @@ -567,7 +570,7 @@ public class ZeppelinServer extends ResourceConfig { } } - private static void setupPrometheusContextHandler(WebAppContext webapp) { + private static void setupPrometheusContextHandler(WebAppContext webapp, PrometheusMeterRegistry promMetricRegistry) { webapp.addServlet(new ServletHolder(new PrometheusServlet(promMetricRegistry)), "/metrics"); } @@ -647,7 +650,7 @@ public class ZeppelinServer extends ResourceConfig { }; } - private static void initWebApp(WebAppContext webApp) { + private static void initWebApp(WebAppContext webApp, ZeppelinConfiguration conf, ServiceLocator sharedServiceLocator, PrometheusMeterRegistry promMetricRegistry) { webApp.addEventListener( new ServletContextListener() { @Override @@ -665,11 +668,13 @@ public class ZeppelinServer extends ResourceConfig { setupRestApiContextHandler(webApp, conf); // prometheus endpoint - setupPrometheusContextHandler(webApp); + if (promMetricRegistry != null) { + setupPrometheusContextHandler(webApp, promMetricRegistry); + } // health endpoints setupHealthCheckContextHandler(webApp); // Notebook server - setupNotebookServer(webApp, conf, sharedServiceLocator); + setupNotebookServer(webApp, conf); } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SessionConfigurator.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SessionConfigurator.java index e6f6727..b6bd59e 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SessionConfigurator.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SessionConfigurator.java @@ -25,6 +25,7 @@ import javax.websocket.server.ServerEndpointConfig; import org.apache.zeppelin.server.ZeppelinServer; import org.apache.zeppelin.util.WatcherSecurityKey; import org.apache.zeppelin.utils.CorsUtils; +import org.glassfish.hk2.api.ServiceLocatorFactory; /** * This class set headers to websocket sessions and inject hk2 when initiating instances by ServerEndpoint annotation. @@ -44,6 +45,6 @@ public class SessionConfigurator extends ServerEndpointConfig.Configurator { @Override public <T> T getEndpointInstance(Class<T> endpointClass) throws InstantiationException { - return ZeppelinServer.sharedServiceLocator.getService(endpointClass); + return ServiceLocatorFactory.getInstance().find("shared-locator").getService(endpointClass); } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/utils/TestUtils.java b/zeppelin-server/src/main/java/org/apache/zeppelin/utils/TestUtils.java index 930817b..0172759 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/utils/TestUtils.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/utils/TestUtils.java @@ -25,12 +25,12 @@ import org.glassfish.hk2.api.ServiceLocatorFactory; public class TestUtils { public static <T> T getInstance(Class<T> clazz) { checkCalledByTestMethod(); - return getInstance(ZeppelinServer.sharedServiceLocator, clazz); + return getInstance(ServiceLocatorFactory.getInstance().find(ZeppelinServer.SERVICE_LOCATOR_NAME), clazz); } public static void clearInstances() { checkCalledByTestMethod(); - ServiceLocatorFactory.getInstance().destroy("shared-locator"); + ServiceLocatorFactory.getInstance().destroy(ZeppelinServer.SERVICE_LOCATOR_NAME); } static <T> T getInstance(ServiceLocator serviceLocator, Class<T> clazz) { diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ZeppelinServerMock.java b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ZeppelinServerMock.java index fa6d859..4ea51ef 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ZeppelinServerMock.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/cluster/ZeppelinServerMock.java @@ -145,8 +145,8 @@ public class ZeppelinServerMock { } } LOG.info("ZeppelinServerMock shutDown..."); - ZeppelinServer.jettyWebServer.stop(); executor.shutdown(); + executor.shutdownNow(); System.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName()); System.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_WAR.getVarName()); System.clearProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName()); @@ -178,7 +178,6 @@ public class ZeppelinServerMock { } PluginManager.reset(); ZeppelinConfiguration.reset(); - ZeppelinServer.reset(); } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java index fb51806..ec7968f 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/recovery/RecoveryTest.java @@ -35,6 +35,7 @@ import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.server.ZeppelinServer; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.utils.TestUtils; +import org.glassfish.hk2.api.ServiceLocatorFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -66,7 +67,7 @@ public class RecoveryTest extends AbstractTestRestApi { recoveryDir.getAbsolutePath()); startUp(RecoveryTest.class.getSimpleName()); - notebook = ZeppelinServer.sharedServiceLocator.getService(Notebook.class); + notebook = ServiceLocatorFactory.getInstance().find("shared-locator").getService(Notebook.class); } @After diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 912183d..46d1bb2 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -326,10 +326,11 @@ public abstract class AbstractTestRestApi { FileUtils.deleteQuietly(shiroIni); } LOG.info("Terminating Zeppelin Server..."); - ZeppelinServer.jettyWebServer.stop(); + //ZeppelinServer.jettyWebServer.stop(); executor.shutdown(); - PluginManager.reset(); - ZeppelinConfiguration.reset(); + executor.shutdownNow(); + //PluginManager.reset(); + //ZeppelinConfiguration.reset(); long s = System.currentTimeMillis(); boolean started = true; @@ -357,7 +358,6 @@ public abstract class AbstractTestRestApi { FileUtils.deleteDirectory(confDir); } TestUtils.clearInstances(); - ZeppelinServer.reset(); } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java index dc95df5..4178919 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/NotebookRestApiTest.java @@ -120,7 +120,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi { } } } - + @Test public void testGetNoteByPath() throws IOException { LOG.info("Running testGetNoteByPath"); @@ -890,7 +890,7 @@ public class NotebookRestApiTest extends AbstractTestRestApi { new TypeToken<Map<String, Object>>() {}.getType()); assertEquals("OK", resp.get("status")); post2.close(); - + Thread.sleep(60000); TestUtils.getInstance(Notebook.class).processNote(note1Id, note1 -> { Paragraph p1 = note1.getParagraph(0);