This is an automated email from the ASF dual-hosted git repository. kuanhsun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push: new 1f6eb4d6 SUBMARINE-1069. Websocket in Submarine Server 1f6eb4d6 is described below commit 1f6eb4d6b581eb64fa827b0ff02902a10fab3884 Author: featherchen <garychen0975321...@gmail.com> AuthorDate: Fri May 20 21:37:40 2022 +0800 SUBMARINE-1069. Websocket in Submarine Server ### What is this PR for? <!-- A few sentences describing the overall goals of the pull request's commits. First time? Check out the contributing guide - https://submarine.apache.org/contribution/contributions.html --> Considering future developing, extend websocket in Sebmarine Server. In this PR, I used the source code of old notebook websocket server(which I didn't remove in this PR, but should remove it in the future refactor process) to build three websocket server and related test. I name the url of each server is /ws/notebook/, /ws/experiment/, /ws/environment/ And I rename the old websocket url as /wss ### What type of PR is it? Feature ### Todos * [x] - automatic test ### What is the Jira issue? <!-- * Open an issue on Jira https://issues.apache.org/jira/browse/SUBMARINE/ * Put link here, and add [SUBMARINE-*Jira number*] in PR title, eg. `SUBMARINE-23. PR title` --> https://issues.apache.org/jira/browse/SUBMARINE-1069 ### How should this be tested? <!-- * First time? Setup Travis CI as described on https://submarine.apache.org/contribution/contributions.html#continuous-integration * Strongly recommended: add automated unit tests for any new or changed behavior * Outline any manual steps to test the PR here. --> By test in submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket ### Screenshots (if appropriate) ![Screenshot from 2022-04-19 20-06-41](https://user-images.githubusercontent.com/57944334/165305850-64638f40-8088-40c1-9b0c-8ac85df6e525.png) ![Screenshot from 2022-04-19 20-07-57](https://user-images.githubusercontent.com/57944334/165305859-f4533154-6f1c-4f5b-ab0d-0d5bd36f17b9.png) ![Screenshot from 2022-04-19 20-08-19](https://user-images.githubusercontent.com/57944334/165305867-64f6b22b-a3e7-4d0d-9786-772006e771a8.png) ### Questions: * Do the license files need updating? No * Are there breaking changes for older versions? No * Does this need new documentation? Maybe Author: featherchen <garychen0975321...@gmail.com> Signed-off-by: kuanhsun <kuanh...@apache.org> Closes #852 from featherchen/SUBMARINE-1069 and squashes the following commits: 0afab660 [featherchen] fix comment c55f250e [featherchen] fix bugs 2c544392 [featherchen] fix bugs 589626f3 [featherchen] add test bfddc094 [featherchen] add test ef0dfb87 [featherchen] add test f5e7d352 [featherchen] modify test d08c8892 [featherchen] modify test cdac6a21 [featherchen] delete redudent import f72b19e4 [featherchen] fix bug cc611b05 [featherchen] set up three websocket 1ecc577d [featherchen] extend websocket component 9011c0d8 [featherchen] test 9c218d83 [featherchen] trivial 84412201 [featherchen] change 57a0a333 [featherchen] LOG problem --- dev-support/maven-config/checkstyle.xml | 0 submarine-server/server-core/pom.xml | 1 + .../apache/submarine/server/SubmarineServer.java | 47 +++++-- .../server/websocket/BasicWebSocketCreator.java | 39 ++++++ .../server/websocket/ConnectionManager.java | 102 +++++++++++++++ .../server/websocket/DateJsonDeserializer.java | 54 ++++++++ .../apache/submarine/server/websocket/Message.java | 82 ++++++++++++ .../server/websocket/WebSocketHandler.java | 85 ++++++++++++ .../server/websocket/WebSocketListener.java | 26 ++++ .../server/websocket/WebSocketServer.java | 142 +++++++++++++++++++++ .../server/AbstractSubmarineServerTest.java | 21 ++- .../EnvironmentWebsocketTest.java} | 24 +++- .../ExperimentWebsocketTest.java} | 26 ++-- .../NotebookWebsocketTest.java} | 23 +++- .../workbench/websocket/NotebookServerTest.java | 2 +- 15 files changed, 638 insertions(+), 36 deletions(-) diff --git a/dev-support/maven-config/checkstyle.xml b/dev-support/maven-config/checkstyle.xml old mode 100644 new mode 100755 diff --git a/submarine-server/server-core/pom.xml b/submarine-server/server-core/pom.xml index 6a9e6461..a1a956e6 100644 --- a/submarine-server/server-core/pom.xml +++ b/submarine-server/server-core/pom.xml @@ -437,6 +437,7 @@ <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> </dependency> + </dependencies> <build> diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java index afb1f34a..92ce425a 100644 --- a/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java +++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/SubmarineServer.java @@ -21,6 +21,7 @@ package org.apache.submarine.server; import org.apache.log4j.PropertyConfigurator; import org.apache.submarine.server.rest.provider.YamlEntityProvider; import org.apache.submarine.server.workbench.websocket.NotebookServer; +import org.apache.submarine.server.websocket.WebSocketServer; import org.apache.submarine.commons.cluster.ClusterServer; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.server.Handler; @@ -105,6 +106,9 @@ public class SubmarineServer extends ResourceConfig { bindAsContract(NotebookServer.class) .to(WebSocketServlet.class) .in(Singleton.class); + bindAsContract(WebSocketServer.class) + .to(WebSocketServlet.class) + .in(Singleton.class); } }); @@ -117,13 +121,15 @@ public class SubmarineServer extends ResourceConfig { // Cluster Server is useless for submarine now. Shield it to improve performance. // setupClusterServer(); + setupWebSocketServer(webApp, conf, sharedServiceLocator); startServer(); + } @Inject public SubmarineServer() { packages("org.apache.submarine.server.workbench.rest", - "org.apache.submarine.server.rest" + "org.apache.submarine.server.rest" ); register(YamlEntityProvider.class); } @@ -170,7 +176,7 @@ public class SubmarineServer extends ResourceConfig { } private static WebAppContext setupWebAppContext(HandlerList handlers, - SubmarineConfiguration conf) { + SubmarineConfiguration conf) { WebAppContext webApp = new WebAppContext(); webApp.setContextPath("/"); File warPath = new File(conf.getString(SubmarineConfVars.ConfVars.WORKBENCH_WEB_WAR)); @@ -196,7 +202,7 @@ public class SubmarineServer extends ResourceConfig { webApp.addServlet(new ServletHolder(RefreshServlet.class), "/user/*"); webApp.addServlet(new ServletHolder(RefreshServlet.class), "/workbench/*"); - handlers.setHandlers(new Handler[] { webApp }); + handlers.setHandlers(new Handler[]{webApp}); return webApp; } @@ -223,9 +229,9 @@ public class SubmarineServer extends ResourceConfig { httpsConfig.addCustomizer(src); connector = new ServerConnector( - server, - new SslConnectionFactory(getSslContextFactory(conf), HttpVersion.HTTP_1_1.asString()), - new HttpConnectionFactory(httpsConfig)); + server, + new SslConnectionFactory(getSslContextFactory(conf), HttpVersion.HTTP_1_1.asString()), + new HttpConnectionFactory(httpsConfig)); } else { connector = new ServerConnector(server); } @@ -246,14 +252,37 @@ public class SubmarineServer extends ResourceConfig { } private static void setupNotebookServer(WebAppContext webapp, - SubmarineConfiguration conf, ServiceLocator serviceLocator) { + SubmarineConfiguration conf, ServiceLocator serviceLocator) { String maxTextMessageSize = conf.getWebsocketMaxTextMessageSize(); final ServletHolder servletHolder = new ServletHolder(serviceLocator.getService(NotebookServer.class)); servletHolder.setInitParameter("maxTextMessageSize", maxTextMessageSize); final ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); - webapp.addServlet(servletHolder, "/ws/*"); + webapp.addServlet(servletHolder, "/wss/*"); + } + + private static void setupWebSocketServer(WebAppContext webapp, + SubmarineConfiguration conf, ServiceLocator serviceLocator) { + String maxTextMessageSize = conf.getWebsocketMaxTextMessageSize(); + final ServletHolder notebookServletHolder = + new ServletHolder(serviceLocator.getService(WebSocketServer.class)); + notebookServletHolder.setInitParameter("maxTextMessageSize", maxTextMessageSize); + + final ServletHolder experimentServletHolder = + new ServletHolder(serviceLocator.getService(WebSocketServer.class)); + experimentServletHolder.setInitParameter("maxTextMessageSize", maxTextMessageSize); + + final ServletHolder environmentServletHolder = + new ServletHolder(serviceLocator.getService(WebSocketServer.class)); + environmentServletHolder.setInitParameter("maxTextMessageSize", maxTextMessageSize); + + + + final ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS); + webapp.addServlet(notebookServletHolder, "/ws/notebook/*"); + webapp.addServlet(experimentServletHolder, "/ws/experiment/*"); + webapp.addServlet(environmentServletHolder, "/ws/environment/*"); } private static void setupClusterServer() { @@ -331,7 +360,7 @@ public class SubmarineServer extends ResourceConfig { StringBuilder sbIndexBuf = new StringBuilder(); try (InputStreamReader reader = - new InputStreamReader(new FileInputStream(indexFile), "GBK"); + new InputStreamReader(new FileInputStream(indexFile), "GBK"); BufferedReader bufferedReader = new BufferedReader(reader);) { String lineTxt = null; while ((lineTxt = bufferedReader.readLine()) != null) { diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/BasicWebSocketCreator.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/BasicWebSocketCreator.java new file mode 100644 index 00000000..cfe5be8d --- /dev/null +++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/BasicWebSocketCreator.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.submarine.server.websocket; + +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.eclipse.jetty.websocket.servlet.WebSocketCreator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Responsible to create the WebSockets for WebSocketServer. + */ +public class BasicWebSocketCreator implements WebSocketCreator { + private static final Logger LOG = LoggerFactory.getLogger(BasicWebSocketCreator.class); + private WebSocketServer webSocketServer; + + public BasicWebSocketCreator(WebSocketServer webSocketServer) { + this.webSocketServer = webSocketServer; + } + public Object createWebSocket(ServletUpgradeRequest request, ServletUpgradeResponse response) { + return new WebSocketHandler(request.getHttpServletRequest(), "", webSocketServer); + } + +} diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/ConnectionManager.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/ConnectionManager.java new file mode 100644 index 00000000..dec72775 --- /dev/null +++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/ConnectionManager.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.submarine.server.websocket; + +import com.google.common.collect.Sets; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.eclipse.jetty.websocket.api.WebSocketException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Date; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * Manager class for managing websocket connections. + */ +public class ConnectionManager { + private static final Logger LOG = LoggerFactory.getLogger(ConnectionManager.class); + private static final Gson gson = new GsonBuilder() + .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ") + .registerTypeAdapter(Date.class, new DateJsonDeserializer()) + .setPrettyPrinting() + .create(); + + final Queue<WebSocketHandler> connectedSockets = new ConcurrentLinkedQueue<>(); + // user -> connection + final Map<String, Queue<WebSocketHandler>> userSocketMap = new ConcurrentHashMap<>(); + + public void addConnection(WebSocketHandler conn) { + connectedSockets.add(conn); + } + + public void removeConnection(WebSocketHandler conn) { + connectedSockets.remove(conn); + } + + public void addUserConnection(String user, WebSocketHandler conn) { + LOG.info("Add user connection {} for user: {}", conn, user); + conn.setUser(user); + if (userSocketMap.containsKey(user)) { + userSocketMap.get(user).add(conn); + } else { + Queue<WebSocketHandler> socketQueue = new ConcurrentLinkedQueue<>(); + socketQueue.add(conn); + userSocketMap.put(user, socketQueue); + } + } + + public void removeUserConnection(String user, WebSocketHandler conn) { + LOG.info("Remove user connection {} for user: {}", conn, user); + if (userSocketMap.containsKey(user)) { + userSocketMap.get(user).remove(conn); + } else { + LOG.warn("Closing connection that is absent in user connections"); + } + } + + protected String serializeMessage(Message m) { + return gson.toJson(m); + } + + public void broadcast(Message m) { + synchronized (connectedSockets) { + for (WebSocketHandler ns : connectedSockets) { + try { + ns.send(serializeMessage(m)); + } catch (IOException | WebSocketException e) { + LOG.error("Send error: " + m, e); + } + } + } + } + + public Set<String> getConnectedUsers() { + Set<String> connectedUsers = Sets.newHashSet(); + for (WebSocketHandler notebookSocket : connectedSockets) { + connectedUsers.add(notebookSocket.getUser()); + } + return connectedUsers; + } +} diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/DateJsonDeserializer.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/DateJsonDeserializer.java new file mode 100644 index 00000000..6fe30e07 --- /dev/null +++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/DateJsonDeserializer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.submarine.server.websocket; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonParseException; + +import java.lang.reflect.Type; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.Date; +import java.util.Locale; + +public class DateJsonDeserializer implements JsonDeserializer{ + private final String[] DATE_FORMATS = new String[] { + "yyyy-MM-dd'T'HH:mm:ssZ", + "MMM d, yyyy h:mm:ss a", + "MMM dd, yyyy HH:mm:ss", + "yyyy-MM-dd HH:mm:ss.SSS" + }; + + @Override + public Date deserialize(JsonElement jsonElement, Type typeOF, + JsonDeserializationContext context) throws JsonParseException { + for (String format : DATE_FORMATS) { + try { + return new SimpleDateFormat(format, Locale.US).parse(jsonElement.getAsString()); + } catch (ParseException e) { + throw new JsonParseException("Unparsable date: \"" + jsonElement.getAsString() + + "\". Supported formats: " + Arrays.toString(DATE_FORMATS)); + } catch (Exception e){ + e.printStackTrace(); + } + } + throw new RuntimeException("Unexpected Error in Deserialize Date"); + } +} diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/Message.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/Message.java new file mode 100644 index 00000000..96de8a96 --- /dev/null +++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/Message.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.submarine.server.websocket; + +import com.google.gson.Gson; +import org.slf4j.Logger; + +import java.util.HashMap; +import java.util.Map; + +public class Message { + /** + * Representation of event type. + */ + public enum OP { + ERROR_INFO, // [s-c] error information to be sent + NOTICE // [s-c] Notice + } + + private static final Gson gson = new Gson(); + public static final Message EMPTY = new Message(null); + + public OP op; + public Map<String, Object> data = new HashMap<>(); + + public Message(OP op) { + this.op = op; + } + + public Message put(String k, Object v) { + data.put(k, v); + return this; + } + + public Object get(String k) { + return data.get(k); + } + + public <T> T getType(String key) { + return (T) data.get(key); + } + + public <T> T getType(String key, Logger LOG) { + try { + return getType(key); + } catch (ClassCastException e) { + LOG.error("Failed to get " + key + " from message (Invalid type). " , e); + return null; + } + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("Message{"); + sb.append("data=").append(data); + sb.append(", op=").append(op); + sb.append('}'); + return sb.toString(); + } + + public String toJson() { + return gson.toJson(this); + } + + public static Message fromJson(String json) { + return gson.fromJson(json, Message.class); + } +} diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketHandler.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketHandler.java new file mode 100644 index 00000000..c8d2f0a8 --- /dev/null +++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketHandler.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.submarine.server.websocket; + +import org.apache.commons.lang.StringUtils; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketAdapter; + +import java.io.IOException; + +import javax.servlet.http.HttpServletRequest; + +/** + * Websocket handler. + */ +public class WebSocketHandler extends WebSocketAdapter { + private Session connection; + private WebSocketListener listener; + private HttpServletRequest request; + private String protocol; + private String user; + + public WebSocketHandler(HttpServletRequest req, String protocol, + WebSocketListener listener) { + this.listener = listener; + this.request = req; + this.protocol = protocol; + this.user = StringUtils.EMPTY; + } + + @Override + public void onWebSocketClose(int closeCode, String message) { + listener.onClose(this, closeCode, message); + } + + @Override + public void onWebSocketConnect(Session connection) { + this.connection = connection; + listener.onOpen(this); + } + + @Override + public void onWebSocketText(String message) { + listener.onMessage(this, message); + } + + public HttpServletRequest getRequest() { + return request; + } + + public String getProtocol() { + return protocol; + } + + public synchronized void send(String serializeMessage) throws IOException { + connection.getRemote().sendString(serializeMessage); + } + + public String getUser() { + return user; + } + + public void setUser(String user) { + this.user = user; + } + + @Override + public String toString() { + return request.getRemoteHost() + ":" + request.getRemotePort(); + } +} diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketListener.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketListener.java new file mode 100644 index 00000000..79e319f5 --- /dev/null +++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketListener.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.submarine.server.websocket; + +/** + * WebSocket listener. + */ +public interface WebSocketListener { + void onClose(WebSocketHandler socket, int code, String message); + void onOpen(WebSocketHandler socket); + void onMessage(WebSocketHandler socket, String message); +} diff --git a/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketServer.java b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketServer.java new file mode 100644 index 00000000..2c1f536d --- /dev/null +++ b/submarine-server/server-core/src/main/java/org/apache/submarine/server/websocket/WebSocketServer.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.submarine.server.websocket; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import java.io.IOException; +import java.util.Date; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang.StringUtils; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.annotation.ManagedOperation; +import org.eclipse.jetty.websocket.servlet.WebSocketServlet; +import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Submarine websocket service. This class used setter injection because all servlet should have + * no-parameter constructor + */ +@ManagedObject +public class WebSocketServer extends WebSocketServlet + implements org.apache.submarine.server.websocket.WebSocketListener { + + /** + * Job manager service type. + */ + protected enum JobManagerServiceType { + JOB_MANAGER_PAGE("JOB_MANAGER_PAGE"); + private String serviceTypeKey; + + JobManagerServiceType(String serviceType) { + this.serviceTypeKey = serviceType; + } + + String getKey() { + return this.serviceTypeKey; + } + } + + private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class); + private static Gson gson = new GsonBuilder() + .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ") + .registerTypeAdapter(Date.class, new DateJsonDeserializer()) + .setPrettyPrinting() + .create(); + + private static AtomicReference<WebSocketServer> self = new AtomicReference<>(); + + private ConnectionManager connectionManager; + + private ExecutorService executorService = Executors.newFixedThreadPool(10); + + public WebSocketServer() { + this.connectionManager = new ConnectionManager(); + WebSocketServer.self.set(this); + LOG.info("WebSocketServer instantiated: {}", this); + } + + @Override + public void configure(WebSocketServletFactory factory) { + factory.setCreator(new BasicWebSocketCreator(this)); + } + + @Override + public void onOpen(WebSocketHandler conn) { + LOG.info("New connection from {}", conn); + connectionManager.addConnection(conn); + } + + @Override + public void onMessage(WebSocketHandler conn, String msg) { + try { + LOG.info("Got Message: " + msg); + if (StringUtils.isEmpty(conn.getUser())) { + connectionManager.addUserConnection("FakeUser1", conn); + } + } catch (Exception e) { + LOG.error("Can't handle message: " + msg, e); + try { + conn.send(serializeMessage(new Message(Message.OP.ERROR_INFO).put( + "info", e.getMessage()))); + } catch (IOException iox) { + LOG.error("Fail to send error info", iox); + } + } + } + + @Override + public void onClose(WebSocketHandler conn, int code, String reason) { + LOG.info("Closed connection to {} ({}) {}", conn, code, reason); + connectionManager.removeConnection(conn); + connectionManager.removeUserConnection(conn.getUser(), conn); + } + + public ConnectionManager getConnectionManager() { + return connectionManager; + } + + protected Message deserializeMessage(String msg) { + return gson.fromJson(msg, Message.class); + } + + protected String serializeMessage(Message m) { + return gson.toJson(m); + } + + public void broadcast(Message m) { + connectionManager.broadcast(m); + } + + @ManagedAttribute + public Set<String> getConnectedUsers() { + return connectionManager.getConnectedUsers(); + } + + @ManagedOperation + public void sendMessage(String message) { + Message m = new Message(Message.OP.NOTICE); + m.data.put("notice", message); + connectionManager.broadcast(m); + } +} diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java index 7cd4fe51..eab31af7 100644 --- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java +++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/AbstractSubmarineServerTest.java @@ -75,7 +75,10 @@ public abstract class AbstractSubmarineServerTest { protected static final Logger LOG = LoggerFactory.getLogger(AbstractSubmarineServerTest.class); - static final String WEBSOCKET_API_URL = "/ws"; + static final String WEBSOCKET_API_URL = "/wss"; + static final String WEBSOCKET_NOTEBOOK_API_URL = "/ws/notebook"; + static final String WEBSOCKET_EXPERIMENT_API_URL = "/ws/experiment"; + static final String WEBSOCKET_ENVIRONMENT_API_URL = "/ws/environment"; static final String URL = getUrlToTest(); protected static final boolean WAS_RUNNING = checkIfServerIsRunning(); @@ -86,8 +89,20 @@ public abstract class AbstractSubmarineServerTest { "/api/" + RestConstants.V1 + "/" + RestConstants.ENVIRONMENT; protected static String ENV_NAME = "my-submarine-env"; - public static String getWebsocketApiUrlToTest() { - String websocketUrl = "ws://localhost:8080" + WEBSOCKET_API_URL; + public static String getWebsocketApiUrlToTest(String serverName) { + + String websocketUrl = "ws://localhost:8080"; + if (serverName.equals("wss")) { + websocketUrl = "ws://localhost:8080" + WEBSOCKET_API_URL; + } else if (serverName.equals("notebook")) { + websocketUrl = "ws://localhost:8080" + WEBSOCKET_NOTEBOOK_API_URL; + } + else if (serverName.equals("environment")) { + websocketUrl = "ws://localhost:8080" + WEBSOCKET_ENVIRONMENT_API_URL; + } + else if (serverName.equals("experiment")) { + websocketUrl = "ws://localhost:8080" + WEBSOCKET_EXPERIMENT_API_URL; + } if (System.getProperty("websocketUrl") != null) { websocketUrl = System.getProperty("websocketurl"); } diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/EnvironmentWebsocketTest.java similarity index 78% copy from submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java copy to submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/EnvironmentWebsocketTest.java index a18e0cdd..af8149a5 100644 --- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java +++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/EnvironmentWebsocketTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.submarine.server.workbench.websocket; +package org.apache.submarine.server.websocket; import org.apache.submarine.server.AbstractSubmarineServerTest; import org.eclipse.jetty.websocket.api.Session; @@ -24,16 +24,22 @@ import org.eclipse.jetty.websocket.client.WebSocketClient; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.URI; import java.util.concurrent.Future; -public class NotebookServerTest { +import static junit.framework.TestCase.assertEquals; + +public class EnvironmentWebsocketTest { + + private static final Logger LOG = LoggerFactory.getLogger(EnvironmentWebsocketTest.class); @BeforeClass public static void init() throws Exception { AbstractSubmarineServerTest.startUp( - NotebookServerTest.class.getSimpleName()); + EnvironmentWebsocketTest.class.getSimpleName()); } @AfterClass @@ -44,8 +50,9 @@ public class NotebookServerTest { @Test public void testWebsocketConnection() throws Exception{ URI uri = URI.create( - AbstractSubmarineServerTest.getWebsocketApiUrlToTest()); + AbstractSubmarineServerTest.getWebsocketApiUrlToTest("environment")); WebSocketClient client = new WebSocketClient(); + try { client.start(); // The socket that receives events @@ -70,21 +77,24 @@ public class NotebookServerTest { public void onWebSocketConnect(Session sess) { super.onWebSocketConnect(sess); - System.out.println("Socket Connected: " + sess); + LOG.info("Socket Connected: " + sess); } @Override public void onWebSocketText(String message) { super.onWebSocketText(message); - System.out.println("Received TEXT message: " + message); + LOG.info("Received TEXT message: " + message); + assertEquals(message, "Hello"); } @Override public void onWebSocketClose(int statusCode, String reason) { super.onWebSocketClose(statusCode, reason); - System.out.println("Socket Closed: [" + statusCode + "] " + reason); + LOG.info("Socket Closed: [" + statusCode + "] " + reason); + assertEquals(statusCode, StatusCode.NORMAL); + assertEquals(reason, "I'm done"); } @Override diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/ExperimentWebsocketTest.java similarity index 78% copy from submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java copy to submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/ExperimentWebsocketTest.java index a18e0cdd..edd79099 100644 --- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java +++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/ExperimentWebsocketTest.java @@ -14,9 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.submarine.server.workbench.websocket; +package org.apache.submarine.server.websocket; -import org.apache.submarine.server.AbstractSubmarineServerTest; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.WebSocketAdapter; @@ -24,16 +23,21 @@ import org.eclipse.jetty.websocket.client.WebSocketClient; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.URI; import java.util.concurrent.Future; +import org.apache.submarine.server.AbstractSubmarineServerTest; -public class NotebookServerTest { +import static junit.framework.TestCase.assertEquals; +public class ExperimentWebsocketTest { + + private static final Logger LOG = LoggerFactory.getLogger(ExperimentWebsocketTest.class); @BeforeClass public static void init() throws Exception { AbstractSubmarineServerTest.startUp( - NotebookServerTest.class.getSimpleName()); + ExperimentWebsocketTest.class.getSimpleName()); } @AfterClass @@ -44,8 +48,9 @@ public class NotebookServerTest { @Test public void testWebsocketConnection() throws Exception{ URI uri = URI.create( - AbstractSubmarineServerTest.getWebsocketApiUrlToTest()); + AbstractSubmarineServerTest.getWebsocketApiUrlToTest("experiment")); WebSocketClient client = new WebSocketClient(); + try { client.start(); // The socket that receives events @@ -70,21 +75,24 @@ public class NotebookServerTest { public void onWebSocketConnect(Session sess) { super.onWebSocketConnect(sess); - System.out.println("Socket Connected: " + sess); + LOG.info("Socket Connected: " + sess); } @Override public void onWebSocketText(String message) { super.onWebSocketText(message); - System.out.println("Received TEXT message: " + message); + LOG.info("Received TEXT message: " + message); + assertEquals(message, "Hello"); } @Override public void onWebSocketClose(int statusCode, String reason) { super.onWebSocketClose(statusCode, reason); - System.out.println("Socket Closed: [" + statusCode + "] " + reason); + LOG.info("Socket Closed: [" + statusCode + "] " + reason); + assertEquals(statusCode, StatusCode.NORMAL); + assertEquals(reason, "I'm done"); } @Override diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/NotebookWebsocketTest.java similarity index 78% copy from submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java copy to submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/NotebookWebsocketTest.java index a18e0cdd..d62e79ae 100644 --- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java +++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/websocket/NotebookWebsocketTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.submarine.server.workbench.websocket; +package org.apache.submarine.server.websocket; import org.apache.submarine.server.AbstractSubmarineServerTest; import org.eclipse.jetty.websocket.api.Session; @@ -24,16 +24,21 @@ import org.eclipse.jetty.websocket.client.WebSocketClient; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.net.URI; import java.util.concurrent.Future; -public class NotebookServerTest { +import static junit.framework.TestCase.assertEquals; +public class NotebookWebsocketTest { + + private static final Logger LOG = LoggerFactory.getLogger(NotebookWebsocketTest.class); @BeforeClass public static void init() throws Exception { AbstractSubmarineServerTest.startUp( - NotebookServerTest.class.getSimpleName()); + NotebookWebsocketTest.class.getSimpleName()); } @AfterClass @@ -44,8 +49,9 @@ public class NotebookServerTest { @Test public void testWebsocketConnection() throws Exception{ URI uri = URI.create( - AbstractSubmarineServerTest.getWebsocketApiUrlToTest()); + AbstractSubmarineServerTest.getWebsocketApiUrlToTest("notebook")); WebSocketClient client = new WebSocketClient(); + try { client.start(); // The socket that receives events @@ -70,21 +76,24 @@ public class NotebookServerTest { public void onWebSocketConnect(Session sess) { super.onWebSocketConnect(sess); - System.out.println("Socket Connected: " + sess); + LOG.info("Socket Connected: " + sess); } @Override public void onWebSocketText(String message) { super.onWebSocketText(message); - System.out.println("Received TEXT message: " + message); + LOG.info("Received TEXT message: " + message); + assertEquals(message, "Hello"); } @Override public void onWebSocketClose(int statusCode, String reason) { super.onWebSocketClose(statusCode, reason); - System.out.println("Socket Closed: [" + statusCode + "] " + reason); + LOG.info("Socket Closed: [" + statusCode + "] " + reason); + assertEquals(statusCode, StatusCode.NORMAL); + assertEquals(reason, "I'm done"); } @Override diff --git a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java b/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java index a18e0cdd..77aaa19d 100644 --- a/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java +++ b/submarine-server/server-core/src/test/java/org/apache/submarine/server/workbench/websocket/NotebookServerTest.java @@ -44,7 +44,7 @@ public class NotebookServerTest { @Test public void testWebsocketConnection() throws Exception{ URI uri = URI.create( - AbstractSubmarineServerTest.getWebsocketApiUrlToTest()); + AbstractSubmarineServerTest.getWebsocketApiUrlToTest("wss")); WebSocketClient client = new WebSocketClient(); try { client.start(); --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@submarine.apache.org For additional commands, e-mail: dev-h...@submarine.apache.org