Repository: incubator-zeppelin Updated Branches: refs/heads/master 16e921b6a -> 4c269e6d8
ZEPPELIN-312: fix a bug with blocking websocket broadcast ### What is this PR for? Replacing synchronization through critical section over the collection of sockets with the lock-free collection implementation `java.util.concurrent.ConcurrentLinkedQueue`. Synchronization was used to avoid parallel collection modifications, as the calls `.sendMessage()` in Jetty implementation of Websockets are thread-safe and can proceed concurrently. ### What type of PR is it? Bug Fix ### Is there a relevant Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-312 ### How should this be tested? See JIRA ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Alexander Bezzubov <[email protected]> Closes #558 from bzz/fix/zeppelin-312-blocking-broadcast and squashes the following commits: bbbf8ae [Alexander Bezzubov] ZEPPELIN-312: refactoring ZeppelinServer to better Java style naming conventions 497a6ca [Alexander Bezzubov] ZEPPELIN-312: replace sync \w lock-free collection 524c401 [Alexander Bezzubov] ZEPPELIN-312: refactoring ZeppelinServer to adhere Java naming conventions Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/4c269e6d Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/4c269e6d Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/4c269e6d Branch: refs/heads/master Commit: 4c269e6d860320e2612eb6b77785c6d1ff3ef106 Parents: 16e921b Author: Alexander Bezzubov <[email protected]> Authored: Tue Dec 22 13:07:43 2015 +0900 Committer: Alexander Bezzubov <[email protected]> Committed: Tue Dec 22 16:29:21 2015 +0900 ---------------------------------------------------------------------- .../apache/zeppelin/server/ZeppelinServer.java | 89 +++++++++----------- .../apache/zeppelin/socket/NotebookServer.java | 32 +++---- .../zeppelin/rest/AbstractTestRestApi.java | 10 ++- .../zeppelin/socket/NotebookServerTest.java | 2 +- .../zeppelin/conf/ZeppelinConfiguration.java | 6 +- 5 files changed, 64 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4c269e6d/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index e0e4a5d..7286b35 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -59,54 +59,65 @@ import org.slf4j.LoggerFactory; * Main class of Zeppelin. * */ - public class ZeppelinServer extends Application { private static final Logger LOG = LoggerFactory.getLogger(ZeppelinServer.class); - private SchedulerFactory schedulerFactory; public static Notebook notebook; - public static NotebookServer notebookServer; - public static Server jettyServer; + public static Server jettyWebServer; + public static NotebookServer notebookWsServer; + private SchedulerFactory schedulerFactory; private InterpreterFactory replFactory; private NotebookRepo notebookRepo; - public static void main(String[] args) throws Exception { + public ZeppelinServer() throws Exception { + LOG.info("Constructor starteds"); ZeppelinConfiguration conf = ZeppelinConfiguration.create(); - conf.setProperty("args", args); - jettyServer = setupJettyServer(conf); + this.schedulerFactory = new SchedulerFactory(); + this.replFactory = new InterpreterFactory(conf, notebookWsServer); + this.notebookRepo = new NotebookRepoSync(conf); + + notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory, notebookWsServer); + LOG.info("Constructor finished"); + } + + public static void main(String[] args) throws InterruptedException { + ZeppelinConfiguration conf = ZeppelinConfiguration.create(); + conf.setProperty("args", args); // REST api - final ServletContextHandler restApi = setupRestApiContextHandler(conf); + final ServletContextHandler restApiContext = setupRestApiContextHandler(conf); // Notebook server - final ServletContextHandler notebook = setupNotebookServer(conf); + final ServletContextHandler notebookContext = setupNotebookServer(conf); // Web UI final WebAppContext webApp = setupWebAppContext(conf); // add all handlers ContextHandlerCollection contexts = new ContextHandlerCollection(); - contexts.setHandlers(new Handler[]{restApi, notebook, webApp}); - jettyServer.setHandler(contexts); + contexts.setHandlers(new Handler[]{restApiContext, notebookContext, webApp}); - LOG.info("Start zeppelin server"); + jettyWebServer = setupJettyServer(conf); + jettyWebServer.setHandler(contexts); + + LOG.info("Starting zeppelin server"); try { - jettyServer.start(); + jettyWebServer.start(); //Instantiates ZeppelinServer } catch (Exception e) { LOG.error("Error while running jettyServer", e); System.exit(-1); } - LOG.info("Started zeppelin server"); + LOG.info("Done, zeppelin server started"); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { LOG.info("Shutting down Zeppelin Server ... "); try { - jettyServer.stop(); - ZeppelinServer.notebook.getInterpreterFactory().close(); - ZeppelinServer.notebook.close(); + jettyWebServer.stop(); + notebook.getInterpreterFactory().close(); + notebook.close(); } catch (Exception e) { LOG.error("Error while stopping servlet container", e); } @@ -125,18 +136,15 @@ public class ZeppelinServer extends Application { System.exit(0); } - jettyServer.join(); + jettyWebServer.join(); ZeppelinServer.notebook.getInterpreterFactory().close(); } - private static Server setupJettyServer(ZeppelinConfiguration conf) - throws Exception { - + private static Server setupJettyServer(ZeppelinConfiguration conf) { AbstractConnector connector; if (conf.useSsl()) { connector = new SslSelectChannelConnector(getSslContextFactory(conf)); - } - else { + } else { connector = new SelectChannelConnector(); } @@ -153,11 +161,9 @@ public class ZeppelinServer extends Application { return server; } - private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf) - throws Exception { - - notebookServer = new NotebookServer(); - final ServletHolder servletHolder = new ServletHolder(notebookServer); + private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf) { + notebookWsServer = new NotebookServer(); + final ServletHolder servletHolder = new ServletHolder(notebookWsServer); servletHolder.setInitParameter("maxTextMessageSize", "1024000"); final ServletContextHandler cxfContext = new ServletContextHandler( @@ -171,9 +177,8 @@ public class ZeppelinServer extends Application { return cxfContext; } - private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) - throws Exception { - + @SuppressWarnings("deprecation") + private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) { // Note that the API for the SslContextFactory is different for // Jetty version 9 SslContextFactory sslContextFactory = new SslContextFactory(); @@ -194,6 +199,7 @@ public class ZeppelinServer extends Application { return sslContextFactory; } + @SuppressWarnings("unused") //TODO(bzz) why unused? private static SSLContext getSslContext(ZeppelinConfiguration conf) throws Exception { @@ -240,23 +246,10 @@ public class ZeppelinServer extends Application { webApp.setTempDirectory(warTempDirectory); } // Explicit bind to root - webApp.addServlet( - new ServletHolder(new DefaultServlet()), - "/*" - ); + webApp.addServlet(new ServletHolder(new DefaultServlet()), "/*"); return webApp; } - public ZeppelinServer() throws Exception { - ZeppelinConfiguration conf = ZeppelinConfiguration.create(); - - this.schedulerFactory = new SchedulerFactory(); - - this.replFactory = new InterpreterFactory(conf, notebookServer); - this.notebookRepo = new NotebookRepoSync(conf); - notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory, notebookServer); - } - @Override public Set<Class<?>> getClasses() { Set<Class<?>> classes = new HashSet<Class<?>>(); @@ -264,14 +257,14 @@ public class ZeppelinServer extends Application { } @Override - public java.util.Set<java.lang.Object> getSingletons() { - Set<Object> singletons = new HashSet<Object>(); + public Set<Object> getSingletons() { + Set<Object> singletons = new HashSet<>(); /** Rest-api root endpoint */ ZeppelinRestApi root = new ZeppelinRestApi(); singletons.add(root); - NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookServer); + NotebookRestApi notebookApi = new NotebookRestApi(notebook, notebookWsServer); singletons.add(notebookApi); InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4c269e6d/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 114582f..a010e58 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; + import javax.servlet.http.HttpServletRequest; import org.apache.zeppelin.conf.ZeppelinConfiguration; @@ -44,12 +46,12 @@ import org.eclipse.jetty.websocket.WebSocketServlet; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import com.google.common.base.Strings; import com.google.gson.Gson; /** * Zeppelin websocket service. * - * @author anthonycorbacho */ public class NotebookServer extends WebSocketServlet implements NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener { @@ -57,7 +59,7 @@ public class NotebookServer extends WebSocketServlet implements .getLogger(NotebookServer.class); Gson gson = new Gson(); final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>(); - final List<NotebookSocket> connectedSockets = new LinkedList<>(); + final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>(); private Notebook notebook() { return ZeppelinServer.notebook; @@ -85,9 +87,7 @@ public class NotebookServer extends WebSocketServlet implements public void onOpen(NotebookSocket conn) { LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(), conn.getRequest().getRemotePort()); - synchronized (connectedSockets) { - connectedSockets.add(conn); - } + connectedSockets.add(conn); } @Override @@ -147,8 +147,7 @@ public class NotebookServer extends WebSocketServlet implements completion(conn, notebook, messagereceived); break; case PING: - pong(); - break; + break; //do nothing case ANGULAR_OBJECT_UPDATED: angularObjectUpdated(conn, notebook, messagereceived); break; @@ -166,9 +165,7 @@ public class NotebookServer extends WebSocketServlet implements LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest() .getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason); removeConnectionFromAllNote(conn); - synchronized (connectedSockets) { - connectedSockets.remove(conn); - } + connectedSockets.remove(conn); } protected Message deserializeMessage(String msg) { @@ -285,13 +282,11 @@ public class NotebookServer extends WebSocketServlet implements } private void broadcastAll(Message m) { - synchronized (connectedSockets) { - for (NotebookSocket conn : connectedSockets) { - try { - conn.send(serializeMessage(m)); - } catch (IOException e) { - LOG.error("socket error", e); - } + for (NotebookSocket conn : connectedSockets) { + try { + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOG.error("socket error", e); } } } @@ -730,6 +725,7 @@ public class NotebookServer extends WebSocketServlet implements public static class ParagraphJobListener implements JobListener { private NotebookServer notebookServer; private Note note; + public ParagraphJobListener(NotebookServer notebookServer, Note note) { this.notebookServer = notebookServer; this.note = note; @@ -771,8 +767,6 @@ public class NotebookServer extends WebSocketServlet implements public JobListener getParagraphJobListener(Note note) { return new ParagraphJobListener(this, note); } - private void pong() { - } private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException { List<InterpreterSetting> settings = note.getNoteReplLoader() http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4c269e6d/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index db7affe..69d1022 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -29,8 +29,12 @@ import java.util.concurrent.Executors; import org.apache.commons.httpclient.HttpClient; import org.apache.commons.httpclient.HttpMethodBase; -import org.apache.commons.httpclient.methods.*; -import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; +import org.apache.commons.httpclient.methods.ByteArrayRequestEntity; +import org.apache.commons.httpclient.methods.DeleteMethod; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.methods.PutMethod; +import org.apache.commons.httpclient.methods.RequestEntity; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.InterpreterSetting; @@ -207,7 +211,7 @@ public abstract class AbstractTestRestApi { } LOG.info("Terminating test Zeppelin..."); - ZeppelinServer.jettyServer.stop(); + ZeppelinServer.jettyWebServer.stop(); executor.shutdown(); long s = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4c269e6d/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java index faef287..67d12b7 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTest.java @@ -60,7 +60,7 @@ public class NotebookServerTest extends AbstractTestRestApi { AbstractTestRestApi.startUp(); gson = new Gson(); notebook = ZeppelinServer.notebook; - notebookServer = ZeppelinServer.notebookServer; + notebookServer = ZeppelinServer.notebookWsServer; } @AfterClass http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4c269e6d/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 909345a..72b6a3c 100755 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -18,12 +18,12 @@ package org.apache.zeppelin.conf; import java.net.URL; -import java.util.*; +import java.util.Arrays; +import java.util.List; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.configuration.tree.ConfigurationNode; -import org.apache.zeppelin.notebook.repo.S3NotebookRepo; import org.apache.zeppelin.notebook.repo.VFSNotebookRepo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,8 +31,6 @@ import org.slf4j.LoggerFactory; /** * Zeppelin configuration. * - * @author Leemoonsoo - * */ public class ZeppelinConfiguration extends XMLConfiguration { private static final String ZEPPELIN_SITE_XML = "zeppelin-site.xml";
