Repository: incubator-zeppelin Updated Branches: refs/heads/master b9583c6e0 -> 3a42a28b0
ZEPPELIN-172 Websocket connection without separate port This PR fixes https://issues.apache.org/jira/browse/ZEPPELIN-172 Author: Lee moon soo <[email protected]> Author: Sjoerd Mulder <[email protected]> Author: Lee moon soo <[email protected]> Closes #170 from Leemoonsoo/websocket and squashes the following commits: 11a302a [Lee moon soo] Check text in more safe way 3cf839d [Lee moon soo] Merge pull request #2 from sjoerdmulder/websocket 7f8bc47 [Sjoerd Mulder] Cleanup of Javascript logic and Server code detecting the correct port 412927f [Lee moon soo] Handle large message f56e417 [Lee moon soo] Add license header 806db9b [Lee moon soo] Remove websocket addr/port configuration 6180ed3 [Lee moon soo] Update README 85d14a0 [Lee moon soo] Create notebookserver instance manually a7b82aa [Lee moon soo] Initial implementation of Websocket inside of Jetty server Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/3a42a28b Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/3a42a28b Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/3a42a28b Branch: refs/heads/master Commit: 3a42a28b01f9a3faf88b5a82b2901af8fc4a16a5 Parents: b9583c6 Author: Lee moon soo <[email protected]> Authored: Mon Aug 3 05:01:00 2015 +0900 Committer: Lee moon soo <[email protected]> Committed: Wed Aug 5 15:18:01 2015 +0900 ---------------------------------------------------------------------- README.md | 2 +- conf/zeppelin-site.xml.template | 17 +- zeppelin-server/pom.xml | 22 +- .../zeppelin/server/AppScriptServlet.java | 95 -------- .../apache/zeppelin/server/ZeppelinServer.java | 54 ++--- .../apache/zeppelin/socket/NotebookServer.java | 236 ++++++++++--------- .../apache/zeppelin/socket/NotebookSocket.java | 73 ++++++ .../zeppelin/socket/NotebookSocketListener.java | 26 ++ .../socket/SslWebSocketServerFactory.java | 76 ------ .../java/org/apache/zeppelin/ZeppelinIT.java | 46 ++-- .../zeppelin/rest/AbstractTestRestApi.java | 1 - .../src/components/baseUrl/baseUrl.service.js | 46 +--- .../websocketEvents/websocketEvents.factory.js | 2 +- .../zeppelin/conf/ZeppelinConfiguration.java | 22 -- 14 files changed, 303 insertions(+), 415 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 67d530e..d565858 100644 --- a/README.md +++ b/README.md @@ -116,7 +116,7 @@ Yarn ### Run ./bin/zeppelin-daemon.sh start - browse localhost:8080 in your browser. 8081 port should be accessible for websocket connection. + browse localhost:8080 in your browser. For configuration details check __./conf__ subdirectory. http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/conf/zeppelin-site.xml.template ---------------------------------------------------------------------- diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 8d0a7f1..13e4d1d 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -28,22 +28,7 @@ <property> <name>zeppelin.server.port</name> <value>8080</value> - <description>Server port. port+1 is used for web socket.</description> -</property> - -<property> - <name>zeppelin.websocket.addr</name> - <value>0.0.0.0</value> - <description>Testing websocket address</description> -</property> - -<!-- If the port value is negative, then it'll default to the server - port + 1. - --> -<property> - <name>zeppelin.websocket.port</name> - <value>-1</value> - <description>Testing websocket port</description> + <description>Server port.</description> </property> <property> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index e85a3ae..2b43e1b 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -95,12 +95,6 @@ <version>${cxf.version}</version> </dependency> - <dependency> - <groupId>org.java-websocket</groupId> - <artifactId>Java-WebSocket</artifactId> - <version>1.3.0</version> - </dependency> - <!-- Swagger --> <dependency> <groupId>com.wordnik</groupId> @@ -297,19 +291,9 @@ </dependency> <dependency> - <groupId>org.atmosphere</groupId> - <artifactId>atmosphere-jersey</artifactId> - <version>2.2.0</version> - <exclusions> - <exclusion> - <groupId>com.sun.jersey</groupId> - <artifactId>jersey-server</artifactId> - </exclusion> - <exclusion> - <groupId>javax.ws.rs</groupId> - <artifactId>javax.ws.rs-api</artifactId> - </exclusion> - </exclusions> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-servlet</artifactId> + <version>1.13</version> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java deleted file mode 100644 index 7a31461..0000000 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/AppScriptServlet.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.server; - -import java.io.IOException; -import java.io.InputStream; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -import org.eclipse.jetty.servlet.DefaultServlet; -import org.eclipse.jetty.util.resource.Resource; - -/** - * Simple servlet to dynamically set the Websocket port - * in the JavaScript sent to the client - */ -public class AppScriptServlet extends DefaultServlet { - - // Hash containing the possible scripts that contain the getPort() - // function originally defined in app.js - private static Set<String> scriptPaths = new HashSet<String>( - Arrays.asList( - "/scripts/scripts.js", - "/components/baseUrl/baseUrl.js" - ) - ); - - private int websocketPort; - - public AppScriptServlet(int websocketPort) { - this.websocketPort = websocketPort; - } - - @Override - protected void doGet(HttpServletRequest request, HttpServletResponse response) - throws ServletException, - IOException { - - // Process all requests not for the app script to the parent - // class - String uri = request.getRequestURI(); - if (!scriptPaths.contains(uri)) { - super.doGet(request, response); - return; - } - - // Read the script file chunk by chunk - Resource scriptFile = getResource(uri); - InputStream is = scriptFile.getInputStream(); - StringBuffer script = new StringBuffer(); - byte[] buffer = new byte[1024]; - while (is.available() > 0) { - int numRead = is.read(buffer); - if (numRead <= 0) { - break; - } - script.append(new String(buffer, 0, numRead, "UTF-8")); - } - - // Replace the getPort function to return the proper value - String startReplaceString = "/* @preserve AppScriptServlet - getPort */"; - String endReplaceString = "/* @preserve AppScriptServlet - close */"; - - int startIndex = script.indexOf(startReplaceString); - int endIndex = script.indexOf(endReplaceString, startIndex); - - if (startIndex >= 0 && endIndex >= 0) { - String replaceString = "this.getPort=function(){return " + websocketPort + "};"; - script.replace(startIndex, endIndex + endReplaceString.length(), replaceString); - } - - response.getWriter().println(script.toString()); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 2bd23bb..ad1d907 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -26,6 +26,7 @@ import java.util.Set; import javax.net.ssl.SSLContext; import javax.servlet.DispatcherType; +import javax.servlet.Servlet; import javax.ws.rs.core.Application; import org.apache.cxf.jaxrs.servlet.CXFNonSpringJaxrsServlet; @@ -40,13 +41,14 @@ import org.apache.zeppelin.rest.NotebookRestApi; import org.apache.zeppelin.rest.ZeppelinRestApi; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.apache.zeppelin.socket.NotebookServer; -import org.apache.zeppelin.socket.SslWebSocketServerFactory; +import org.eclipse.jetty.server.AbstractConnector; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.bio.SocketConnector; import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.server.session.SessionHandler; -import org.eclipse.jetty.server.ssl.SslSocketConnector; +import org.eclipse.jetty.server.ssl.SslSelectChannelConnector; +import org.eclipse.jetty.servlet.DefaultServlet; import org.eclipse.jetty.servlet.FilterHolder; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; @@ -83,8 +85,6 @@ public class ZeppelinServer extends Application { conf.setProperty("args", args); jettyServer = setupJettyServer(conf); - notebookServer = setupNotebookServer(conf); - notebookServer.start(); // REST api final ServletContextHandler restApi = setupRestApiContextHandler(); @@ -93,17 +93,18 @@ public class ZeppelinServer extends Application { */ final ServletContextHandler swagger = setupSwaggerContextHandler(conf); + // Notebook server + final ServletContextHandler notebook = setupNotebookServer(conf); + // Web UI - LOG.info("Create zeppelin websocket on {}:{}", notebookServer.getAddress() - .getAddress(), notebookServer.getPort()); - final WebAppContext webApp = setupWebAppContext(conf, notebookServer.getPort()); + final WebAppContext webApp = setupWebAppContext(conf); //Below is commented since zeppelin-docs module is removed. //final WebAppContext webAppSwagg = setupWebAppSwagger(conf); // add all handlers ContextHandlerCollection contexts = new ContextHandlerCollection(); //contexts.setHandlers(new Handler[]{swagger, restApi, webApp, webAppSwagg}); - contexts.setHandlers(new Handler[]{swagger, restApi, webApp}); + contexts.setHandlers(new Handler[]{swagger, restApi, notebook, webApp}); jettyServer.setHandler(contexts); LOG.info("Start zeppelin server"); @@ -114,10 +115,7 @@ public class ZeppelinServer extends Application { @Override public void run() { LOG.info("Shutting down Zeppelin Server ... "); try { - notebook.getInterpreterFactory().close(); - jettyServer.stop(); - notebookServer.stop(); } catch (Exception e) { LOG.error("Error while stopping servlet container", e); } @@ -142,12 +140,12 @@ public class ZeppelinServer extends Application { private static Server setupJettyServer(ZeppelinConfiguration conf) throws Exception { - SocketConnector connector; + AbstractConnector connector; if (conf.useSsl()) { - connector = new SslSocketConnector(getSslContextFactory(conf)); + connector = new SslSelectChannelConnector(getSslContextFactory(conf)); } else { - connector = new SocketConnector(); + connector = new SelectChannelConnector(); } // Set some timeout options to make debugging easier. @@ -163,20 +161,22 @@ public class ZeppelinServer extends Application { return server; } - private static NotebookServer setupNotebookServer(ZeppelinConfiguration conf) + private static ServletContextHandler setupNotebookServer(ZeppelinConfiguration conf) throws Exception { - NotebookServer server = new NotebookServer(conf.getWebSocketAddress(), conf.getWebSocketPort()); + notebookServer = new NotebookServer(); + final ServletHolder servletHolder = new ServletHolder(notebookServer); + servletHolder.setInitParameter("maxTextMessageSize", "1024000"); - // Default WebSocketServer uses unencrypted connector, so only need to - // change the connector if SSL should be used. - if (conf.useSsl()) { - SslWebSocketServerFactory wsf = new SslWebSocketServerFactory(getSslContext(conf)); - wsf.setNeedClientAuth(conf.useClientAuth()); - server.setWebSocketFactory(wsf); - } + final ServletContextHandler cxfContext = new ServletContextHandler( + ServletContextHandler.SESSIONS); - return server; + cxfContext.setSessionHandler(new SessionHandler()); + cxfContext.setContextPath("/"); + cxfContext.addServlet(servletHolder, "/ws/*"); + cxfContext.addFilter(new FilterHolder(CorsFilter.class), "/*", + EnumSet.allOf(DispatcherType.class)); + return cxfContext; } private static SslContextFactory getSslContextFactory(ZeppelinConfiguration conf) @@ -257,7 +257,7 @@ public class ZeppelinServer extends Application { } private static WebAppContext setupWebAppContext( - ZeppelinConfiguration conf, int websocketPort) { + ZeppelinConfiguration conf) { WebAppContext webApp = new WebAppContext(); File warPath = new File(conf.getString(ConfVars.ZEPPELIN_WAR)); @@ -273,7 +273,7 @@ public class ZeppelinServer extends Application { } // Explicit bind to root webApp.addServlet( - new ServletHolder(new AppScriptServlet(websocketPort)), + new ServletHolder(new DefaultServlet()), "/*" ); return webApp; http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 90a2a95..ed35ea1 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import javax.servlet.http.HttpServletRequest; + import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; @@ -39,9 +41,8 @@ import org.apache.zeppelin.scheduler.JobListener; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.server.ZeppelinServer; import org.apache.zeppelin.socket.Message.OP; -import org.java_websocket.WebSocket; -import org.java_websocket.handshake.ClientHandshake; -import org.java_websocket.server.WebSocketServer; +import org.eclipse.jetty.websocket.WebSocket; +import org.eclipse.jetty.websocket.WebSocketServlet; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,40 +55,36 @@ import com.google.gson.Gson; * * @author anthonycorbacho */ -public class NotebookServer extends WebSocketServer implements - JobListenerFactory, AngularObjectRegistryListener { +public class NotebookServer extends WebSocketServlet implements + NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener { - private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class); - private static final String DEFAULT_ADDR = "0.0.0.0"; - private static final int DEFAULT_PORT = 8282; + private static final Logger LOG = LoggerFactory + .getLogger(NotebookServer.class); Gson gson = new Gson(); - Map<String, List<WebSocket>> noteSocketMap = new HashMap<String, List<WebSocket>>(); - List<WebSocket> connectedSockets = new LinkedList<WebSocket>(); - - public NotebookServer() { - super(new InetSocketAddress(DEFAULT_ADDR, DEFAULT_PORT)); - } - - public NotebookServer(String address, int port) { - super(new InetSocketAddress(address, port)); - } + Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<String, List<NotebookSocket>>(); + List<NotebookSocket> connectedSockets = new LinkedList<NotebookSocket>(); private Notebook notebook() { return ZeppelinServer.notebook; } @Override - public void onOpen(WebSocket conn, ClientHandshake handshake) { - LOG.info("New connection from {} : {}", conn.getRemoteSocketAddress().getHostName(), conn - .getRemoteSocketAddress().getPort()); + public WebSocket doWebSocketConnect(HttpServletRequest req, String protocol) { + return new NotebookSocket(req, protocol, this); + } + + @Override + public void onOpen(NotebookSocket conn) { + LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(), + conn.getRequest().getRemotePort()); synchronized (connectedSockets) { connectedSockets.add(conn); } } @Override - public void onMessage(WebSocket conn, String msg) { + public void onMessage(NotebookSocket conn, String msg) { Notebook notebook = notebook(); try { Message messagereceived = deserializeMessage(msg); @@ -132,7 +129,7 @@ public class NotebookServer extends WebSocketServer implements break; case PING: pong(); - break; + break; case ANGULAR_OBJECT_UPDATED: angularObjectUpdated(conn, notebook, messagereceived); break; @@ -146,17 +143,9 @@ public class NotebookServer extends WebSocketServer implements } @Override - public void onClose(WebSocket conn, int code, String reason, boolean remote) { - LOG.info("Closed connection to {} : {}", conn.getRemoteSocketAddress().getHostName(), conn - .getRemoteSocketAddress().getPort()); - removeConnectionFromAllNote(conn); - synchronized (connectedSockets) { - connectedSockets.remove(conn); - } - } - - @Override - public void onError(WebSocket conn, Exception message) { + public void onClose(NotebookSocket conn, int code, String reason) { + LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest() + .getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason); removeConnectionFromAllNote(conn); synchronized (connectedSockets) { connectedSockets.remove(conn); @@ -172,12 +161,13 @@ public class NotebookServer extends WebSocketServer implements return gson.toJson(m); } - private void addConnectionToNote(String noteId, WebSocket socket) { + private void addConnectionToNote(String noteId, NotebookSocket socket) { synchronized (noteSocketMap) { - removeConnectionFromAllNote(socket); // make sure a socket relates only a single note. - List<WebSocket> socketList = noteSocketMap.get(noteId); + removeConnectionFromAllNote(socket); // make sure a socket relates only a + // single note. + List<NotebookSocket> socketList = noteSocketMap.get(noteId); if (socketList == null) { - socketList = new LinkedList<WebSocket>(); + socketList = new LinkedList<NotebookSocket>(); noteSocketMap.put(noteId, socketList); } @@ -187,9 +177,9 @@ public class NotebookServer extends WebSocketServer implements } } - private void removeConnectionFromNote(String noteId, WebSocket socket) { + private void removeConnectionFromNote(String noteId, NotebookSocket socket) { synchronized (noteSocketMap) { - List<WebSocket> socketList = noteSocketMap.get(noteId); + List<NotebookSocket> socketList = noteSocketMap.get(noteId); if (socketList != null) { socketList.remove(socket); } @@ -198,11 +188,11 @@ public class NotebookServer extends WebSocketServer implements private void removeNote(String noteId) { synchronized (noteSocketMap) { - List<WebSocket> socketList = noteSocketMap.remove(noteId); + List<NotebookSocket> socketList = noteSocketMap.remove(noteId); } } - private void removeConnectionFromAllNote(WebSocket socket) { + private void removeConnectionFromAllNote(NotebookSocket socket) { synchronized (noteSocketMap) { Set<String> keys = noteSocketMap.keySet(); for (String noteId : keys) { @@ -211,12 +201,12 @@ public class NotebookServer extends WebSocketServer implements } } - private String getOpenNoteId(WebSocket socket) { + private String getOpenNoteId(NotebookSocket socket) { String id = null; synchronized (noteSocketMap) { Set<String> keys = noteSocketMap.keySet(); for (String noteId : keys) { - List<WebSocket> sockets = noteSocketMap.get(noteId); + List<NotebookSocket> sockets = noteSocketMap.get(noteId); if (sockets.contains(socket)) { id = noteId; } @@ -225,7 +215,8 @@ public class NotebookServer extends WebSocketServer implements return id; } - private void broadcastToNoteBindedInterpreter(String interpreterGroupId, Message m) { + private void broadcastToNoteBindedInterpreter(String interpreterGroupId, + Message m) { Notebook notebook = notebook(); List<Note> notes = notebook.getAllNotes(); for (Note note : notes) { @@ -240,23 +231,31 @@ public class NotebookServer extends WebSocketServer implements private void broadcast(String noteId, Message m) { synchronized (noteSocketMap) { - List<WebSocket> socketLists = noteSocketMap.get(noteId); + List<NotebookSocket> socketLists = noteSocketMap.get(noteId); if (socketLists == null || socketLists.size() == 0) { return; } LOG.info("SEND >> " + m.op); - for (WebSocket conn : socketLists) { - conn.send(serializeMessage(m)); + for (NotebookSocket conn : socketLists) { + try { + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOG.error("socket error", e); + } } } } private void broadcastAll(Message m) { synchronized (connectedSockets) { - for (WebSocket conn : connectedSockets) { - conn.send(serializeMessage(m)); + for (NotebookSocket conn : connectedSockets) { + try { + conn.send(serializeMessage(m)); + } catch (IOException e) { + LOG.error("socket error", e); + } } } } @@ -278,7 +277,8 @@ public class NotebookServer extends WebSocketServer implements broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo)); } - private void sendNote(WebSocket conn, Notebook notebook, Message fromMessage) { + private void sendNote(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); if (noteId == null) { return; @@ -296,7 +296,8 @@ public class NotebookServer extends WebSocketServer implements throws SchedulerException, IOException { String noteId = (String) fromMessage.get("id"); String name = (String) fromMessage.get("name"); - Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); + Map<String, Object> config = (Map<String, Object>) fromMessage + .get("config"); if (noteId == null) { return; } @@ -319,7 +320,8 @@ public class NotebookServer extends WebSocketServer implements } } - private boolean isCronUpdated(Map<String, Object> configA, Map<String, Object> configB) { + private boolean isCronUpdated(Map<String, Object> configA, + Map<String, Object> configB) { boolean cronUpdated = false; if (configA.get("cron") != null && configB.get("cron") != null && configA.get("cron").equals(configB.get("cron"))) { @@ -352,14 +354,16 @@ public class NotebookServer extends WebSocketServer implements broadcastNoteList(); } - private void updateParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void updateParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } - Map<String, Object> params = (Map<String, Object>) fromMessage.get("params"); - Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); + Map<String, Object> params = (Map<String, Object>) fromMessage + .get("params"); + Map<String, Object> config = (Map<String, Object>) fromMessage + .get("config"); final Note note = notebook.getNote(getOpenNoteId(conn)); Paragraph p = note.getParagraph(paragraphId); p.settings.setParams(params); @@ -370,8 +374,8 @@ public class NotebookServer extends WebSocketServer implements broadcast(note.id(), new Message(OP.PARAGRAPH).put("paragraph", p)); } - private void removeParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void removeParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -385,7 +389,8 @@ public class NotebookServer extends WebSocketServer implements } } - private void completion(WebSocket conn, Notebook notebook, Message fromMessage) { + private void completion(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); String buffer = (String) fromMessage.get("buf"); int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString()); @@ -404,6 +409,7 @@ public class NotebookServer extends WebSocketServer implements /** * When angular object updated from client + * * @param conn * @param notebook * @param fromMessage @@ -417,12 +423,12 @@ public class NotebookServer extends WebSocketServer implements AngularObject ao = null; boolean global = false; - - + // propagate change to (Remote) AngularObjectRegistry Note note = notebook.getNote(noteId); if (note != null) { - List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings(); + List<InterpreterSetting> settings = note.getNoteReplLoader() + .getInterpreterSettings(); for (InterpreterSetting setting : settings) { if (setting.getInterpreterGroup() == null) { continue; @@ -433,7 +439,7 @@ public class NotebookServer extends WebSocketServer implements .getInterpreterGroup().getAngularObjectRegistry(); // first trying to get local registry - ao = angularObjectRegistry.get(varName, noteId); + ao = angularObjectRegistry.get(varName, noteId); if (ao == null) { // then try global registry ao = angularObjectRegistry.get(varName, null); @@ -454,26 +460,29 @@ public class NotebookServer extends WebSocketServer implements } } } - - if (global) { // broadcast change to all web session that uses related interpreter. + + if (global) { // broadcast change to all web session that uses related + // interpreter. for (Note n : notebook.getAllNotes()) { - List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings(); + List<InterpreterSetting> settings = note.getNoteReplLoader() + .getInterpreterSettings(); for (InterpreterSetting setting : settings) { if (setting.getInterpreterGroup() == null) { continue; } - + if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) { AngularObjectRegistry angularObjectRegistry = setting .getInterpreterGroup().getAngularObjectRegistry(); - this.broadcast(n.id(), new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", ao) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", n.id())); + this.broadcast( + n.id(), + new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", n.id())); } } } - } else { // broadcast to all web session for the note + } else { // broadcast to all web session for the note this.broadcast( note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) @@ -482,24 +491,25 @@ public class NotebookServer extends WebSocketServer implements } } - - private void moveParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void moveParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } - final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString()); + final int newIndex = (int) Double.parseDouble(fromMessage.get("index") + .toString()); final Note note = notebook.getNote(getOpenNoteId(conn)); note.moveParagraph(paragraphId, newIndex); note.persist(); broadcastNote(note); } - private void insertParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { - final int index = (int) Double.parseDouble(fromMessage.get("index").toString()); + private void insertParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { + final int index = (int) Double.parseDouble(fromMessage.get("index") + .toString()); final Note note = notebook.getNote(getOpenNoteId(conn)); note.insertParagraph(index); @@ -507,9 +517,8 @@ public class NotebookServer extends WebSocketServer implements broadcastNote(note); } - - private void cancelParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void cancelParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -520,8 +529,8 @@ public class NotebookServer extends WebSocketServer implements p.abort(); } - private void runParagraph(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + private void runParagraph(NotebookSocket conn, Notebook notebook, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -531,13 +540,16 @@ public class NotebookServer extends WebSocketServer implements String text = (String) fromMessage.get("paragraph"); p.setText(text); p.setTitle((String) fromMessage.get("title")); - Map<String, Object> params = (Map<String, Object>) fromMessage.get("params"); + Map<String, Object> params = (Map<String, Object>) fromMessage + .get("params"); p.settings.setParams(params); - Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); + Map<String, Object> config = (Map<String, Object>) fromMessage + .get("config"); p.setConfig(config); // if it's the last paragraph, let's add a new one - boolean isTheLastParagraph = note.getLastParagraph().getId().equals(p.getId()); + boolean isTheLastParagraph = note.getLastParagraph().getId() + .equals(p.getId()); if (!Strings.isNullOrEmpty(text) && isTheLastParagraph) { note.addParagraph(); } @@ -546,12 +558,12 @@ public class NotebookServer extends WebSocketServer implements try { note.run(paragraphId); - } - catch (Exception ex) { + } catch (Exception ex) { LOG.error("Exception from run", ex); if (p != null) { - p.setReturn(new InterpreterResult( - InterpreterResult.Code.ERROR, ex.getMessage()), ex); + p.setReturn( + new InterpreterResult(InterpreterResult.Code.ERROR, ex.getMessage()), + ex); p.setStatus(Status.ERROR); } } @@ -572,12 +584,15 @@ public class NotebookServer extends WebSocketServer implements @Override public void onProgressUpdate(Job job, int progress) { - notebookServer.broadcast(note.id(), - new Message(OP.PROGRESS).put("id", job.getId()).put("progress", job.progress())); + notebookServer.broadcast( + note.id(), + new Message(OP.PROGRESS).put("id", job.getId()).put("progress", + job.progress())); } @Override - public void beforeStatusChange(Job job, Status before, Status after) {} + public void beforeStatusChange(Job job, Status before, Status after) { + } @Override public void afterStatusChange(Job job, Status before, Status after) { @@ -606,19 +621,22 @@ public class NotebookServer extends WebSocketServer implements private void pong() { } - private void sendAllAngularObjects(Note note, WebSocket conn) { - List<InterpreterSetting> settings = note.getNoteReplLoader().getInterpreterSettings(); + private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException { + List<InterpreterSetting> settings = note.getNoteReplLoader() + .getInterpreterSettings(); if (settings == null || settings.size() == 0) { return; } for (InterpreterSetting intpSetting : settings) { - AngularObjectRegistry registry = intpSetting.getInterpreterGroup().getAngularObjectRegistry(); + AngularObjectRegistry registry = intpSetting.getInterpreterGroup() + .getAngularObjectRegistry(); List<AngularObject> objects = registry.getAllWithGlobal(note.id()); for (AngularObject object : objects) { conn.send(serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", object) - .put("interpreterGroupId", intpSetting.getInterpreterGroup().getId()) + .put("angularObject", object) + .put("interpreterGroupId", + intpSetting.getInterpreterGroup().getId()) .put("noteId", note.id()))); } } @@ -641,23 +659,25 @@ public class NotebookServer extends WebSocketServer implements if (object.getNoteId() != null && !note.id().equals(object.getNoteId())) { continue; } - + List<InterpreterSetting> intpSettings = note.getNoteReplLoader() .getInterpreterSettings(); - if (intpSettings.isEmpty()) continue; + if (intpSettings.isEmpty()) + continue; for (InterpreterSetting setting : intpSettings) { if (setting.getInterpreterGroup().getId().equals(interpreterGroupId)) { - broadcast(note.id(), new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", object) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", note.id())); + broadcast( + note.id(), + new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", object) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", note.id())); } } - } + } } - @Override public void onRemove(String interpreterGroupId, String name, String noteId) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java new file mode 100644 index 0000000..aceea45 --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.socket; + +import java.io.IOException; + +import javax.servlet.http.HttpServletRequest; + +import org.eclipse.jetty.websocket.WebSocket; + +/** + * Notebook websocket + */ +public class NotebookSocket implements WebSocket.OnTextMessage{ + + private Connection connection; + private NotebookSocketListener listener; + private HttpServletRequest request; + private String protocol; + + + public NotebookSocket(HttpServletRequest req, String protocol, + NotebookSocketListener listener) { + this.listener = listener; + this.request = req; + this.protocol = protocol; + } + + @Override + public void onClose(int closeCode, String message) { + listener.onClose(this, closeCode, message); + } + + @Override + public void onOpen(Connection connection) { + this.connection = connection; + listener.onOpen(this); + } + + @Override + public void onMessage(String message) { + listener.onMessage(this, message); + } + + + public HttpServletRequest getRequest() { + return request; + } + + public String getProtocol() { + return protocol; + } + + public void send(String serializeMessage) throws IOException { + connection.sendMessage(serializeMessage); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java new file mode 100644 index 0000000..77fed6e --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocketListener.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.socket; + +/** + * NoteboookSocket listener + */ +public interface NotebookSocketListener { + public void onClose(NotebookSocket socket, int code, String message); + public void onOpen(NotebookSocket socket); + public void onMessage(NotebookSocket socket, String message); +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java deleted file mode 100644 index f44dc1f..0000000 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/SslWebSocketServerFactory.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.socket; - -import java.io.IOException; -import java.nio.channels.ByteChannel; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.util.concurrent.ExecutorService; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; - -import org.java_websocket.SSLSocketChannel2; -import org.java_websocket.server.DefaultSSLWebSocketServerFactory; - -/** - * Extension of the java_websocket library's DefaultSslWebSocketServerFactory - * to require client side authentication during the SSL handshake - */ -public class SslWebSocketServerFactory - extends DefaultSSLWebSocketServerFactory { - - protected boolean needClientAuth; - - public SslWebSocketServerFactory(SSLContext sslcontext) { - super(sslcontext); - initAttributes(); - } - - public SslWebSocketServerFactory( - SSLContext sslcontext, - ExecutorService exec) { - - super(sslcontext, exec); - initAttributes(); - } - - protected void initAttributes() { - this.needClientAuth = false; - } - - @Override - public ByteChannel wrapChannel(SocketChannel channel, SelectionKey key) - throws IOException { - - SSLEngine sslEngine = sslcontext.createSSLEngine(); - sslEngine.setUseClientMode(false); - sslEngine.setNeedClientAuth(needClientAuth); - return new SSLSocketChannel2( channel, sslEngine, exec, key ); - } - - public boolean getNeedClientAuth() { - return needClientAuth; - } - - public void setNeedClientAuth(boolean needClientAuth) { - this.needClientAuth = needClientAuth; - } -} - http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java index 779396c..b170a95 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/ZeppelinIT.java @@ -150,6 +150,20 @@ public class ZeppelinIT { return null != System.getenv("CI"); } + boolean waitForText(final String txt, final By by) { + try { + new WebDriverWait(driver, 5).until(new ExpectedCondition<Boolean>() { + @Override + public Boolean apply(WebDriver d) { + return txt.equals(driver.findElement(by).getText()); + } + }); + return true; + } catch (TimeoutException e) { + return false; + } + } + @Test public void testAngularDisplay() throws InterruptedException{ if (!endToEndTestEnabled()) { @@ -176,8 +190,8 @@ public class ZeppelinIT { waitForParagraph(1, "FINISHED"); // check expected text - assertEquals("BindingTest__", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest__", By.xpath( + getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); /* * Bind variable @@ -190,8 +204,8 @@ public class ZeppelinIT { waitForParagraph(2, "FINISHED"); // check expected text - assertEquals("BindingTest_1_", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest_1_", By.xpath( + getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); /* @@ -206,8 +220,8 @@ public class ZeppelinIT { waitForParagraph(3, "FINISHED"); // check expected text - assertEquals("myVar=1", driver.findElement(By.xpath( - getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]")).getText()); + waitForText("myVar=1", By.xpath( + getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]")); /* * Click element @@ -216,8 +230,8 @@ public class ZeppelinIT { getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).click(); // check expected text - assertEquals("BindingTest_2_", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest_2_", By.xpath( + getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); /* * Register watcher @@ -242,13 +256,13 @@ public class ZeppelinIT { getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).click(); // check expected text - assertEquals("BindingTest_3_", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest_3_", By.xpath( + getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); waitForParagraph(3, "FINISHED"); // check expected text by watcher - assertEquals("myVar=3", driver.findElement(By.xpath( - getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]")).getText()); + waitForText("myVar=3", By.xpath( + getParagraphXPath(3) + "//div[@ng-bind=\"paragraph.result.msg\"]")); /* * Unbind @@ -261,8 +275,8 @@ public class ZeppelinIT { waitForParagraph(5, "FINISHED"); // check expected text - assertEquals("BindingTest__", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest__", + By.xpath(getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); /* * Bind again and see rebind works. @@ -272,8 +286,8 @@ public class ZeppelinIT { waitForParagraph(2, "FINISHED"); // check expected text - assertEquals("BindingTest_1_", driver.findElement(By.xpath( - getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")).getText()); + waitForText("BindingTest_1_", + By.xpath(getParagraphXPath(1) + "//div[@id=\"angularTestButton\"]")); System.out.println("testCreateNotebook Test executed"); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 744c1e0..393dc7b 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -187,7 +187,6 @@ public abstract class AbstractTestRestApi { protected static void shutDown() throws Exception { if (!wasRunning) { LOG.info("Terminating test Zeppelin..."); - ZeppelinServer.notebookServer.stop(); ZeppelinServer.jettyServer.stop(); executor.shutdown(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-web/src/components/baseUrl/baseUrl.service.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/components/baseUrl/baseUrl.service.js b/zeppelin-web/src/components/baseUrl/baseUrl.service.js index 662d88f..f5eb2df 100644 --- a/zeppelin-web/src/components/baseUrl/baseUrl.service.js +++ b/zeppelin-web/src/components/baseUrl/baseUrl.service.js @@ -15,52 +15,32 @@ angular.module('zeppelinWebApp').service('baseUrlSrv', function() { - /** Get the current port of the websocket - * - * When running Zeppelin, the body of this function will be dynamically - * overridden with the AppScriptServlet from zeppelin-site.xml config value. - * - * If the config value is not defined, it defaults to the HTTP port + 1 - * - * In the case of running "grunt serve", this function will appear - * as is. - */ - - /* @preserve AppScriptServlet - getPort */ this.getPort = function() { var port = Number(location.port); - if (location.protocol !== 'https:' && !port) { - port = 80; - } else if (location.protocol === 'https:' && !port) { - port = 443; - } else if (port === 3333 || port === 9000) { - port = 8080; - } - return port + 1; - }; - /* @preserve AppScriptServlet - close */ - - this.getWebsocketProtocol = function() { - return location.protocol === 'https:' ? 'wss' : 'ws'; - }; - - this.getRestApiBase = function() { - var port = Number(location.port); if (!port) { port = 80; if (location.protocol === 'https:') { port = 443; } } - + //Exception for when running locally via grunt if (port === 3333 || port === 9000) { port = 8080; } - return location.protocol + '//' + location.hostname + ':' + port + skipTrailingSlash(location.pathname) + '/api'; + return port; + }; + + this.getWebsocketUrl = function() { + var wsProtocol = location.protocol === 'https:' ? 'wss:' : 'ws:'; + return wsProtocol + '//' + location.hostname + ':' + this.getPort() + '/ws'; }; - + + this.getRestApiBase = function() { + return location.protocol + '//' + location.hostname + ':' + this.getPort() + skipTrailingSlash(location.pathname) + '/api'; + }; + var skipTrailingSlash = function(path) { return path.replace(/\/$/, ''); }; -}); \ No newline at end of file +}); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js index 731266f..6d9f177 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js +++ b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js @@ -16,7 +16,7 @@ angular.module('zeppelinWebApp').factory('websocketEvents', function($rootScope, $websocket, baseUrlSrv) { var websocketCalls = {}; - websocketCalls.ws = $websocket(baseUrlSrv.getWebsocketProtocol() + '://' + location.hostname + ':' + baseUrlSrv.getPort()); + websocketCalls.ws = $websocket(baseUrlSrv.getWebsocketUrl()); websocketCalls.ws.onOpen(function() { console.log('Websocket created'); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/3a42a28b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 086c15e..223dc70 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -268,25 +268,6 @@ public class ZeppelinConfiguration extends XMLConfiguration { return getInt(ConfVars.ZEPPELIN_PORT); } - public String getWebSocketAddress() { - return getString(ConfVars.ZEPPELIN_WEBSOCKET_ADDR); - } - - public int getWebSocketPort() { - int port = getInt(ConfVars.ZEPPELIN_WEBSOCKET_PORT); - int serverPort = getServerPort(); - - if (port < 0) { - if (serverPort <= 0) { - return 0; - } else { - return serverPort + 1; - } - } else { - return port; - } - } - public String getKeyStorePath() { return getRelativeDir( String.format("%s/%s", @@ -389,9 +370,6 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_HOME("zeppelin.home", "../"), ZEPPELIN_ADDR("zeppelin.server.addr", "0.0.0.0"), ZEPPELIN_PORT("zeppelin.server.port", 8080), - // negative websocket port denotes that server port + 1 should be used - ZEPPELIN_WEBSOCKET_ADDR("zeppelin.websocket.addr", "0.0.0.0"), - ZEPPELIN_WEBSOCKET_PORT("zeppelin.websocket.port", -1), ZEPPELIN_SSL("zeppelin.ssl", false), ZEPPELIN_SSL_CLIENT_AUTH("zeppelin.ssl.client.auth", false), ZEPPELIN_SSL_KEYSTORE_PATH("zeppelin.ssl.keystore.path", "keystore"),
