This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push: new fb069f9 [ZEPPELIN-5535] Remove jetty websocket libs fb069f9 is described below commit fb069f92f65ccecca3dbcd03793febfa8d05652f Author: Philipp Dallig <philipp.dal...@gmail.com> AuthorDate: Tue Oct 12 15:25:13 2021 +0200 [ZEPPELIN-5535] Remove jetty websocket libs ### What is this PR for? This PR removes more jetty websocket libs out of the code ### What type of PR is it? - Improvement ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-5535 ### How should this be tested? * CI ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Philipp Dallig <philipp.dal...@gmail.com> Closes #4254 from Reamer/custom_ZEPPELIN-5535 and squashes the following commits: 105cf35e2 [Philipp Dallig] Remove jetty websocket libs --- .../apache/zeppelin/socket/ConnectionManager.java | 66 ++++++---------------- .../org/apache/zeppelin/socket/NotebookSocket.java | 4 +- 2 files changed, 18 insertions(+), 52 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java index 2b565b5..51c8fdf 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/ConnectionManager.java @@ -33,7 +33,6 @@ import org.apache.zeppelin.common.Message; import org.apache.zeppelin.notebook.socket.WatcherMessage; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.util.WatcherSecurityKey; -import org.eclipse.jetty.websocket.api.WebSocketException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,8 +119,8 @@ public class ConnectionManager { public void removeNoteConnection(String noteId, NotebookSocket socket) { LOGGER.debug("Remove connection {} from note: {}", socket, noteId); synchronized (noteSocketMap) { - List<NotebookSocket> socketList = noteSocketMap.get(noteId); - if (socketList != null) { + List<NotebookSocket> socketList = noteSocketMap.getOrDefault(noteId, Collections.emptyList()); + if (!socketList.isEmpty()) { socketList.remove(socket); } checkCollaborativeStatus(noteId, socketList); @@ -168,24 +167,13 @@ public class ConnectionManager { synchronized (noteSocketMap) { Set<String> noteIds = noteSocketMap.keySet(); for (String noteId : noteIds) { - removeConnectionFromNote(noteId, socket); + removeNoteConnection(noteId, socket); } } } - private void removeConnectionFromNote(String noteId, NotebookSocket socket) { - LOGGER.debug("Remove connection {} from note: {}", socket, noteId); - synchronized (noteSocketMap) { - List<NotebookSocket> socketList = noteSocketMap.get(noteId); - if (socketList != null) { - socketList.remove(socket); - } - checkCollaborativeStatus(noteId, socketList); - } - } - private void checkCollaborativeStatus(String noteId, List<NotebookSocket> socketList) { - if (!collaborativeModeEnable) { + if (!collaborativeModeEnable.booleanValue()) { return; } boolean collaborativeStatusNew = socketList.size() > 1; @@ -217,28 +205,28 @@ public class ConnectionManager { for (NotebookSocket ns : connectedSockets) { try { ns.send(serializeMessage(m)); - } catch (IOException | WebSocketException e) { - LOGGER.error("Send error: " + m, e); + } catch (IOException e) { + LOGGER.error("Send error: {}", m, e); } } } } public void broadcast(String noteId, Message m) { - List<NotebookSocket> socketsToBroadcast = Collections.emptyList(); + List<NotebookSocket> socketsToBroadcast; synchronized (noteSocketMap) { broadcastToWatchers(noteId, StringUtils.EMPTY, m); List<NotebookSocket> socketLists = noteSocketMap.get(noteId); - if (socketLists == null || socketLists.size() == 0) { + if (socketLists == null || socketLists.isEmpty()) { return; } socketsToBroadcast = new ArrayList<>(socketLists); } - LOGGER.debug("SEND >> " + m); + LOGGER.debug("SEND >> {}", m); for (NotebookSocket conn : socketsToBroadcast) { try { conn.send(serializeMessage(m)); - } catch (IOException | WebSocketException e) { + } catch (IOException e) { LOGGER.error("socket error", e); } } @@ -254,7 +242,7 @@ public class ConnectionManager { .message(serializeMessage(message)) .build() .toJson()); - } catch (IOException | WebSocketException e) { + } catch (IOException e) { LOGGER.error("Cannot broadcast message to watcher", e); } } @@ -262,24 +250,24 @@ public class ConnectionManager { } public void broadcastExcept(String noteId, Message m, NotebookSocket exclude) { - List<NotebookSocket> socketsToBroadcast = Collections.emptyList(); + List<NotebookSocket> socketsToBroadcast; synchronized (noteSocketMap) { broadcastToWatchers(noteId, StringUtils.EMPTY, m); List<NotebookSocket> socketLists = noteSocketMap.get(noteId); - if (socketLists == null || socketLists.size() == 0) { + if (socketLists == null || socketLists.isEmpty()) { return; } socketsToBroadcast = new ArrayList<>(socketLists); } - LOGGER.debug("SEND >> " + m); + LOGGER.debug("SEND >> {}", m); for (NotebookSocket conn : socketsToBroadcast) { if (exclude.equals(conn)) { continue; } try { conn.send(serializeMessage(m)); - } catch (IOException | WebSocketException e) { + } catch (IOException e) { LOGGER.error("socket error", e); } } @@ -301,7 +289,7 @@ public class ConnectionManager { try { conn.send(serializedMsg); - } catch (IOException | WebSocketException e) { + } catch (IOException e) { LOGGER.error("Cannot broadcast message to conn", e); } } @@ -331,7 +319,7 @@ public class ConnectionManager { public void unicast(Message m, NotebookSocket conn) { try { conn.send(serializeMessage(m)); - } catch (IOException | WebSocketException e) { + } catch (IOException e) { LOGGER.error("socket error", e); } broadcastToWatchers(StringUtils.EMPTY, StringUtils.EMPTY, m); @@ -404,26 +392,6 @@ public class ConnectionManager { } } - private void broadcastNewParagraph(Note note, Paragraph para) { - LOGGER.info("Broadcasting paragraph on run call instead of note."); - int paraIndex = note.getParagraphs().indexOf(para); - broadcast(note.getId(), - new Message(Message.OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex)); - } - - // public void broadcastNoteList(AuthenticationInfo subject, Set<String> userAndRoles) { - // if (subject == null) { - // subject = new AuthenticationInfo(StringUtils.EMPTY); - // } - // //send first to requesting user - // List<Map<String, String>> notesInfo = generateNotesInfo(false, subject, userAndRoles); - // multicastToUser(subject.getUser(), new Message(Message.OP.NOTES_INFO) - // .put("notes", notesInfo)); - // //to others afterwards - // broadcastNoteListExcept(notesInfo, subject); - // } - - private void broadcastNoteForms(Note note) { GUI formsSettings = new GUI(); formsSettings.setForms(note.getNoteForms()); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java index 8d06770..c9f0a64 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookSocket.java @@ -17,8 +17,6 @@ package org.apache.zeppelin.socket; import org.apache.commons.lang3.StringUtils; -import org.eclipse.jetty.websocket.api.WebSocketAdapter; - import java.io.IOException; import java.util.Map; @@ -27,7 +25,7 @@ import javax.websocket.Session; /** * Notebook websocket. */ -public class NotebookSocket extends WebSocketAdapter { +public class NotebookSocket { private Session session; private Map<String, Object> headers; private String user;