http://git-wip-us.apache.org/repos/asf/zeppelin/blob/0d746fa2/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 8606a74..a3fce8f 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -20,26 +20,6 @@ import com.google.common.base.Strings; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; -import java.io.IOException; -import java.lang.reflect.Type; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.servlet.http.HttpServletRequest; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; @@ -93,16 +73,41 @@ import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Zeppelin websocket service. */ +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; +import java.lang.reflect.Type; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Zeppelin websocket service. + */ public class NotebookServer extends WebSocketServlet implements NotebookSocketListener, - AngularObjectRegistryListener, - RemoteInterpreterProcessListener, - ApplicationEventListener, - ParagraphJobListener, - NotebookServerMBean { + AngularObjectRegistryListener, + RemoteInterpreterProcessListener, + ApplicationEventListener, + ParagraphJobListener, + NotebookServerMBean { - /** Job manager service type. */ + /** + * Job manager service type. + */ protected enum JobManagerServiceType { JOB_MANAGER_PAGE("JOB_MANAGER_PAGE"); private String serviceTypeKey; @@ -116,17 +121,17 @@ public class NotebookServer extends WebSocketServlet } } + // private HashSet<String> collaborativeModeList = new HashSet<>(); - private Boolean collaborativeModeEnable = - ZeppelinConfiguration.create().isZeppelinNotebookCollaborativeModeEnable(); + private Boolean collaborativeModeEnable = ZeppelinConfiguration + .create() + .isZeppelinNotebookCollaborativeModeEnable(); private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class); - private static Gson gson = - new GsonBuilder() - .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ") - .registerTypeAdapter(Date.class, new NotebookImportDeserializer()) - .setPrettyPrinting() - .registerTypeAdapterFactory(Input.TypeAdapterFactory) - .create(); + private static Gson gson = new GsonBuilder() + .setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ") + .registerTypeAdapter(Date.class, new NotebookImportDeserializer()) + .setPrettyPrinting() + .registerTypeAdapterFactory(Input.TypeAdapterFactory).create(); private ConnectionManager connectionManager; private NotebookService notebookService; @@ -135,6 +140,7 @@ public class NotebookServer extends WebSocketServlet private ExecutorService executorService = Executors.newFixedThreadPool(10); + public NotebookServer() { this.connectionManager = new ConnectionManager(); } @@ -190,41 +196,28 @@ public class NotebookServer extends WebSocketServlet try { Message messagereceived = deserializeMessage(msg); if (messagereceived.op != OP.PING) { - LOG.debug( - "RECEIVE: " - + messagereceived.op - + ", RECEIVE PRINCIPAL: " - + messagereceived.principal - + ", RECEIVE TICKET: " - + messagereceived.ticket - + ", RECEIVE ROLES: " - + messagereceived.roles - + ", RECEIVE DATA: " - + messagereceived.data); + LOG.debug("RECEIVE: " + messagereceived.op + + ", RECEIVE PRINCIPAL: " + messagereceived.principal + + ", RECEIVE TICKET: " + messagereceived.ticket + + ", RECEIVE ROLES: " + messagereceived.roles + + ", RECEIVE DATA: " + messagereceived.data); } if (LOG.isTraceEnabled()) { LOG.trace("RECEIVE MSG = " + messagereceived); } String ticket = TicketContainer.instance.getTicket(messagereceived.principal); - if (ticket != null - && (messagereceived.ticket == null || !ticket.equals(messagereceived.ticket))) { + if (ticket != null && + (messagereceived.ticket == null || !ticket.equals(messagereceived.ticket))) { /* not to pollute logs, log instead of exception */ if (StringUtils.isEmpty(messagereceived.ticket)) { - LOG.debug( - "{} message: invalid ticket {} != {}", - messagereceived.op, - messagereceived.ticket, - ticket); + LOG.debug("{} message: invalid ticket {} != {}", messagereceived.op, + messagereceived.ticket, ticket); } else { if (!messagereceived.op.equals(OP.PING)) { - conn.send( - serializeMessage( - new Message(OP.SESSION_LOGOUT) - .put( - "info", - "Your ticket is invalid possibly due to server restart. " - + "Please login again."))); + conn.send(serializeMessage(new Message(OP.SESSION_LOGOUT).put("info", + "Your ticket is invalid possibly due to server restart. " + + "Please login again."))); } } return; @@ -240,7 +233,8 @@ public class NotebookServer extends WebSocketServlet userAndRoles.add(messagereceived.principal); if (!messagereceived.roles.equals("")) { HashSet<String> roles = - gson.fromJson(messagereceived.roles, new TypeToken<HashSet<String>>() {}.getType()); + gson.fromJson(messagereceived.roles, new TypeToken<HashSet<String>>() { + }.getType()); if (roles != null) { userAndRoles.addAll(roles); } @@ -249,8 +243,8 @@ public class NotebookServer extends WebSocketServlet connectionManager.addUserConnection(messagereceived.principal, conn); } AuthenticationInfo subject = - new AuthenticationInfo( - messagereceived.principal, messagereceived.roles, messagereceived.ticket); + new AuthenticationInfo(messagereceived.principal, messagereceived.roles, + messagereceived.ticket); // Lets be elegant here switch (messagereceived.op) { @@ -348,7 +342,7 @@ public class NotebookServer extends WebSocketServlet completion(conn, messagereceived); break; case PING: - break; // do nothing + break; //do nothing case ANGULAR_OBJECT_UPDATED: angularObjectUpdated(conn, userAndRoles, notebook, messagereceived); break; @@ -438,54 +432,44 @@ public class NotebookServer extends WebSocketServlet public void unicastNoteJobInfo(NotebookSocket conn, Message fromMessage) throws IOException { connectionManager.addNoteConnection(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn); - getJobManagerService() - .getNoteJobInfoByUnixTime( - 0, - getServiceContext(fromMessage), - new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(conn) { - @Override - public void onSuccess( - List<JobManagerService.NoteJobInfo> notesJobInfo, ServiceContext context) - throws IOException { - super.onSuccess(notesJobInfo, context); - Map<String, Object> response = new HashMap<>(); - response.put("lastResponseUnixTime", System.currentTimeMillis()); - response.put("jobs", notesJobInfo); - conn.send( - serializeMessage(new Message(OP.LIST_NOTE_JOBS).put("noteJobs", response))); - } + getJobManagerService().getNoteJobInfoByUnixTime(0, getServiceContext(fromMessage), + new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(conn) { + @Override + public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo, + ServiceContext context) throws IOException { + super.onSuccess(notesJobInfo, context); + Map<String, Object> response = new HashMap<>(); + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notesJobInfo); + conn.send(serializeMessage(new Message(OP.LIST_NOTE_JOBS).put("noteJobs", response))); + } - @Override - public void onFailure(Exception ex, ServiceContext context) throws IOException { - LOG.warn(ex.getMessage()); - } - }); + @Override + public void onFailure(Exception ex, ServiceContext context) throws IOException { + LOG.warn(ex.getMessage()); + } + }); } public void broadcastUpdateNoteJobInfo(long lastUpdateUnixTime) throws IOException { - getJobManagerService() - .getNoteJobInfoByUnixTime( - lastUpdateUnixTime, - null, - new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(null) { - @Override - public void onSuccess( - List<JobManagerService.NoteJobInfo> notesJobInfo, ServiceContext context) - throws IOException { - super.onSuccess(notesJobInfo, context); - Map<String, Object> response = new HashMap<>(); - response.put("lastResponseUnixTime", System.currentTimeMillis()); - response.put("jobs", notesJobInfo); - connectionManager.broadcast( - JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), - new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response)); - } + getJobManagerService().getNoteJobInfoByUnixTime(lastUpdateUnixTime, null, + new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(null) { + @Override + public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo, + ServiceContext context) throws IOException { + super.onSuccess(notesJobInfo, context); + Map<String, Object> response = new HashMap<>(); + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notesJobInfo); + connectionManager.broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), + new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response)); + } - @Override - public void onFailure(Exception ex, ServiceContext context) throws IOException { - LOG.warn(ex.getMessage()); - } - }); + @Override + public void onFailure(Exception ex, ServiceContext context) throws IOException { + LOG.warn(ex.getMessage()); + } + }); } public void unsubscribeNoteJobInfo(NotebookSocket conn) { @@ -496,13 +480,13 @@ public class NotebookServer extends WebSocketServlet String noteId = (String) fromMessage.data.get("noteId"); List<InterpreterSettingsList> settingList = InterpreterBindingUtils.getInterpreterBindings(notebook(), noteId); - conn.send( - serializeMessage( - new Message(OP.INTERPRETER_BINDINGS).put("interpreterBindings", settingList))); + conn.send(serializeMessage( + new Message(OP.INTERPRETER_BINDINGS).put("interpreterBindings", settingList))); } - public List<Map<String, String>> generateNotesInfo( - boolean needsReload, AuthenticationInfo subject, Set<String> userAndRoles) { + public List<Map<String, String>> generateNotesInfo(boolean needsReload, + AuthenticationInfo subject, + Set<String> userAndRoles) { Notebook notebook = notebook(); ZeppelinConfiguration conf = notebook.getConf(); @@ -545,18 +529,17 @@ public class NotebookServer extends WebSocketServlet if (note.isPersonalizedMode()) { broadcastParagraphs(p.getUserParagraphMap(), p); } else { - connectionManager.broadcast( - note.getId(), + connectionManager.broadcast(note.getId(), new Message(OP.PARAGRAPH).put("paragraph", new ParagraphWithRuntimeInfo(p))); } } - public void broadcastParagraphs( - Map<String, Paragraph> userParagraphMap, Paragraph defaultParagraph) { + public void broadcastParagraphs(Map<String, Paragraph> userParagraphMap, + Paragraph defaultParagraph) { if (null != userParagraphMap) { for (String user : userParagraphMap.keySet()) { - connectionManager.multicastToUser( - user, new Message(OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user))); + connectionManager.multicastToUser(user, + new Message(OP.PARAGRAPH).put("paragraph", userParagraphMap.get(user))); } } } @@ -564,8 +547,7 @@ public class NotebookServer extends WebSocketServlet private void broadcastNewParagraph(Note note, Paragraph para) { LOG.info("Broadcasting paragraph on run call instead of note."); int paraIndex = note.getParagraphs().indexOf(para); - connectionManager.broadcast( - note.getId(), + connectionManager.broadcast(note.getId(), new Message(OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex)); } @@ -573,152 +555,125 @@ public class NotebookServer extends WebSocketServlet if (subject == null) { subject = new AuthenticationInfo(StringUtils.EMPTY); } - // send first to requesting user + //send first to requesting user List<Map<String, String>> notesInfo = generateNotesInfo(false, subject, userAndRoles); - connectionManager.multicastToUser( - subject.getUser(), new Message(OP.NOTES_INFO).put("notes", notesInfo)); - // to others afterwards + connectionManager.multicastToUser(subject.getUser(), + new Message(OP.NOTES_INFO).put("notes", notesInfo)); + //to others afterwards connectionManager.broadcastNoteListExcept(notesInfo, subject); } public void listNotes(NotebookSocket conn, Message message) throws IOException { - getNotebookService() - .listNotes( - false, - getServiceContext(message), - new WebSocketServiceCallback<List<Map<String, String>>>(conn) { - @Override - public void onSuccess(List<Map<String, String>> notesInfo, ServiceContext context) - throws IOException { - super.onSuccess(notesInfo, context); - connectionManager.unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn); - } - }); + getNotebookService().listNotes(false, getServiceContext(message), + new WebSocketServiceCallback<List<Map<String, String>>>(conn) { + @Override + public void onSuccess(List<Map<String, String>> notesInfo, + ServiceContext context) throws IOException { + super.onSuccess(notesInfo, context); + connectionManager.unicast(new Message(OP.NOTES_INFO).put("notes", notesInfo), conn); + } + }); } public void broadcastReloadedNoteList(NotebookSocket conn, ServiceContext context) throws IOException { - getNotebookService() - .listNotes( - false, - context, - new WebSocketServiceCallback<List<Map<String, String>>>(conn) { - @Override - public void onSuccess(List<Map<String, String>> notesInfo, ServiceContext context) - throws IOException { - super.onSuccess(notesInfo, context); - connectionManager.multicastToUser( - context.getAutheInfo().getUser(), - new Message(OP.NOTES_INFO).put("notes", notesInfo)); - // to others afterwards - connectionManager.broadcastNoteListExcept(notesInfo, context.getAutheInfo()); - } - }); + getNotebookService().listNotes(false, context, + new WebSocketServiceCallback<List<Map<String, String>>>(conn) { + @Override + public void onSuccess(List<Map<String, String>> notesInfo, + ServiceContext context) throws IOException { + super.onSuccess(notesInfo, context); + connectionManager.multicastToUser(context.getAutheInfo().getUser(), + new Message(OP.NOTES_INFO).put("notes", notesInfo)); + //to others afterwards + connectionManager.broadcastNoteListExcept(notesInfo, context.getAutheInfo()); + } + }); } - void permissionError( - NotebookSocket conn, - String op, - String userName, - Set<String> userAndRoles, - Set<String> allowed) - throws IOException { + void permissionError(NotebookSocket conn, String op, String userName, Set<String> userAndRoles, + Set<String> allowed) throws IOException { LOG.info("Cannot {}. Connection readers {}. Allowed readers {}", op, userAndRoles, allowed); - conn.send( - serializeMessage( - new Message(OP.AUTH_INFO) - .put( - "info", - "Insufficient privileges to " - + op - + " note.\n\n" - + "Allowed users or roles: " - + allowed.toString() - + "\n\n" - + "But the user " - + userName - + " belongs to: " - + userAndRoles.toString()))); - } - - /** @return false if user doesn't have writer permission for this paragraph */ - private boolean hasParagraphWriterPermission( - NotebookSocket conn, - Notebook notebook, - String noteId, - HashSet<String> userAndRoles, - String principal, - String op) + conn.send(serializeMessage(new Message(OP.AUTH_INFO).put("info", + "Insufficient privileges to " + op + " note.\n\n" + "Allowed users or roles: " + allowed + .toString() + "\n\n" + "But the user " + userName + " belongs to: " + userAndRoles + .toString()))); + } + + + /** + * @return false if user doesn't have writer permission for this paragraph + */ + private boolean hasParagraphWriterPermission(NotebookSocket conn, Notebook notebook, + String noteId, HashSet<String> userAndRoles, + String principal, String op) throws IOException { NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); if (!notebookAuthorization.isWriter(noteId, userAndRoles)) { - permissionError(conn, op, principal, userAndRoles, notebookAuthorization.getOwners(noteId)); + permissionError(conn, op, principal, userAndRoles, + notebookAuthorization.getOwners(noteId)); return false; } return true; } - /** @return false if user doesn't have owner permission for this paragraph */ - private boolean hasParagraphOwnerPermission( - NotebookSocket conn, - Notebook notebook, - String noteId, - HashSet<String> userAndRoles, - String principal, - String op) - throws IOException { + /** + * @return false if user doesn't have owner permission for this paragraph + */ + private boolean hasParagraphOwnerPermission(NotebookSocket conn, Notebook notebook, String noteId, + HashSet<String> userAndRoles, String principal, + String op) throws IOException { NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); if (!notebookAuthorization.isOwner(noteId, userAndRoles)) { - permissionError(conn, op, principal, userAndRoles, notebookAuthorization.getOwners(noteId)); + permissionError(conn, op, principal, userAndRoles, + notebookAuthorization.getOwners(noteId)); return false; } return true; } - private void getNote(NotebookSocket conn, Message fromMessage) throws IOException { + private void getNote(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); if (noteId == null) { return; } - getNotebookService() - .getNote( - noteId, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Note>(conn) { - @Override - public void onSuccess(Note note, ServiceContext context) throws IOException { - connectionManager.addNoteConnection(note.getId(), conn); - conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); - sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn); - } - }); - } - - private void getHomeNote(NotebookSocket conn, Message fromMessage) throws IOException { - - getNotebookService() - .getHomeNote( - getServiceContext(fromMessage), - new WebSocketServiceCallback<Note>(conn) { - @Override - public void onSuccess(Note note, ServiceContext context) throws IOException { - super.onSuccess(note, context); - if (note != null) { - connectionManager.addNoteConnection(note.getId(), conn); - conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); - sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn); - } else { - connectionManager.removeConnectionFromAllNote(conn); - conn.send(serializeMessage(new Message(OP.NOTE).put("note", null))); - } - } - }); + getNotebookService().getNote(noteId, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + connectionManager.addNoteConnection(note.getId(), conn); + conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); + sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn); + } + }); + } + + private void getHomeNote(NotebookSocket conn, + Message fromMessage) throws IOException { + + getNotebookService().getHomeNote(getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + super.onSuccess(note, context); + if (note != null) { + connectionManager.addNoteConnection(note.getId(), conn); + conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); + sendAllAngularObjects(note, context.getAutheInfo().getUser(), conn); + } else { + connectionManager.removeConnectionFromAllNote(conn); + conn.send(serializeMessage(new Message(OP.NOTE).put("note", null))); + } + } + }); } - private void updateNote(NotebookSocket conn, Message fromMessage) throws IOException { + private void updateNote(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); String name = (String) fromMessage.get("name"); Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); @@ -729,29 +684,20 @@ public class NotebookServer extends WebSocketServlet return; } - getNotebookService() - .updateNote( - noteId, - name, - config, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Note>(conn) { - @Override - public void onSuccess(Note note, ServiceContext context) throws IOException { - connectionManager.broadcast( - note.getId(), - new Message(OP.NOTE_UPDATED) - .put("name", name) - .put("config", config) - .put("info", note.getInfo())); - broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); - } - }); + getNotebookService().updateNote(noteId, name, config, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + connectionManager.broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name) + .put("config", config) + .put("info", note.getInfo())); + broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); + } + }); } - private void updatePersonalizedMode( - NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, Message fromMessage) - throws IOException { + private void updatePersonalizedMode(NotebookSocket conn, HashSet<String> userAndRoles, + Notebook notebook, Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); String personalized = (String) fromMessage.get("personalized"); boolean isPersonalized = personalized.equals("true") ? true : false; @@ -760,8 +706,8 @@ public class NotebookServer extends WebSocketServlet return; } - if (!hasParagraphOwnerPermission( - conn, notebook, noteId, userAndRoles, fromMessage.principal, "persoanlized")) { + if (!hasParagraphOwnerPermission(conn, notebook, noteId, + userAndRoles, fromMessage.principal, "persoanlized")) { return; } @@ -774,40 +720,31 @@ public class NotebookServer extends WebSocketServlet } } - private void renameNote(NotebookSocket conn, Message fromMessage) throws IOException { + private void renameNote(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); String name = (String) fromMessage.get("name"); if (noteId == null) { return; } - getNotebookService() - .renameNote( - noteId, - name, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Note>(conn) { - @Override - public void onSuccess(Note note, ServiceContext context) throws IOException { - super.onSuccess(note, context); - broadcastNote(note); - broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); - } - }); + getNotebookService().renameNote(noteId, name, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + super.onSuccess(note, context); + broadcastNote(note); + broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); + } + }); } - private void renameFolder( - NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, Message fromMessage) - throws IOException { + private void renameFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, + Message fromMessage) throws IOException { renameFolder(conn, userAndRoles, notebook, fromMessage, "rename"); } - private void renameFolder( - NotebookSocket conn, - HashSet<String> userAndRoles, - Notebook notebook, - Message fromMessage, - String op) - throws IOException { + private void renameFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, + Message fromMessage, String op) throws IOException { String oldFolderId = (String) fromMessage.get("id"); String newFolderId = (String) fromMessage.get("name"); @@ -817,13 +754,8 @@ public class NotebookServer extends WebSocketServlet for (Note note : notebook.getNotesUnderFolder(oldFolderId)) { String noteId = note.getId(); - if (!hasParagraphOwnerPermission( - conn, - notebook, - noteId, - userAndRoles, - fromMessage.principal, - op + " folder of '" + note.getName() + "'")) { + if (!hasParagraphOwnerPermission(conn, notebook, noteId, + userAndRoles, fromMessage.principal, op + " folder of '" + note.getName() + "'")) { return; } } @@ -843,57 +775,47 @@ public class NotebookServer extends WebSocketServlet } } - private void createNote(NotebookSocket conn, Message message) throws IOException { + private void createNote(NotebookSocket conn, + Message message) throws IOException { String noteName = (String) message.get("name"); String defaultInterpreterGroup = (String) message.get("defaultInterpreterGroup"); - getNotebookService() - .createNote( - noteName, - defaultInterpreterGroup, - getServiceContext(message), - new WebSocketServiceCallback<Note>(conn) { - @Override - public void onSuccess(Note note, ServiceContext context) throws IOException { - super.onSuccess(note, context); - connectionManager.addNoteConnection(note.getId(), conn); - conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note))); - broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); - } + getNotebookService().createNote(noteName, defaultInterpreterGroup, getServiceContext(message), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + super.onSuccess(note, context); + connectionManager.addNoteConnection(note.getId(), conn); + conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", note))); + broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); + } - @Override - public void onFailure(Exception ex, ServiceContext context) throws IOException { - super.onFailure(ex, context); - conn.send( - serializeMessage( - new Message(OP.ERROR_INFO) - .put( - "info", - "Failed to create note.\n" + ExceptionUtils.getMessage(ex)))); - } - }); + @Override + public void onFailure(Exception ex, ServiceContext context) throws IOException { + super.onFailure(ex, context); + conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", + "Failed to create note.\n" + ExceptionUtils.getMessage(ex)))); + } + }); } - private void deleteNote(NotebookSocket conn, Message fromMessage) throws IOException { + private void deleteNote(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); - getNotebookService() - .removeNote( - noteId, - getServiceContext(fromMessage), - new WebSocketServiceCallback<String>(conn) { - @Override - public void onSuccess(String message, ServiceContext context) throws IOException { - super.onSuccess(message, context); - connectionManager.removeNoteConnection(noteId); - broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); - } - }); + getNotebookService().removeNote(noteId, getServiceContext(fromMessage), + new WebSocketServiceCallback<String>(conn) { + @Override + public void onSuccess(String message, ServiceContext context) throws IOException { + super.onSuccess(message, context); + connectionManager.removeNoteConnection(noteId); + broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); + } + }); } - private void removeFolder( - NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, Message fromMessage) - throws IOException { + private void removeFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, + Message fromMessage) throws IOException { String folderId = (String) fromMessage.get("id"); if (folderId == null) { return; @@ -903,13 +825,8 @@ public class NotebookServer extends WebSocketServlet for (Note note : notes) { String noteId = note.getId(); - if (!hasParagraphOwnerPermission( - conn, - notebook, - noteId, - userAndRoles, - fromMessage.principal, - "remove folder of '" + note.getName() + "'")) { + if (!hasParagraphOwnerPermission(conn, notebook, noteId, + userAndRoles, fromMessage.principal, "remove folder of '" + note.getName() + "'")) { return; } } @@ -922,9 +839,8 @@ public class NotebookServer extends WebSocketServlet broadcastNoteList(subject, userAndRoles); } - private void moveNoteToTrash( - NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, Message fromMessage) - throws SchedulerException, IOException { + private void moveNoteToTrash(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, + Message fromMessage) throws SchedulerException, IOException { String noteId = (String) fromMessage.get("id"); if (noteId == null) { return; @@ -945,8 +861,8 @@ public class NotebookServer extends WebSocketServlet } } - private void moveFolderToTrash( - NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, Message fromMessage) + private void moveFolderToTrash(NotebookSocket conn, HashSet<String> userAndRoles, + Notebook notebook, Message fromMessage) throws SchedulerException, IOException { String folderId = (String) fromMessage.get("id"); if (folderId == null) { @@ -975,15 +891,16 @@ public class NotebookServer extends WebSocketServlet } } - private void restoreNote(NotebookSocket conn, Message fromMessage) throws IOException { + private void restoreNote(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); - getNotebookService() - .restoreNote(noteId, getServiceContext(fromMessage), new WebSocketServiceCallback(conn)); + getNotebookService().restoreNote(noteId, getServiceContext(fromMessage), + new WebSocketServiceCallback(conn)); + } - private void restoreFolder( - NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, Message fromMessage) - throws IOException { + private void restoreFolder(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, + Message fromMessage) throws IOException { String folderId = (String) fromMessage.get("id"); if (folderId == null) { @@ -994,7 +911,7 @@ public class NotebookServer extends WebSocketServlet if (folder != null && folder.isTrash()) { String restoreName = folder.getId().replaceFirst(Folder.TRASH_FOLDER_ID + "/", "").trim(); - // restore cron for each paragraph + //restore cron for each paragraph List<Note> noteList = folder.getNotesRecursively(); for (Note note : noteList) { Map<String, Object> config = note.getConfig(); @@ -1013,9 +930,8 @@ public class NotebookServer extends WebSocketServlet } } - private void restoreAll( - NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, Message fromMessage) - throws IOException { + private void restoreAll(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, + Message fromMessage) throws IOException { Folder trashFolder = notebook.getFolder(Folder.TRASH_FOLDER_ID); if (trashFolder != null) { fromMessage.data = new HashMap<>(); @@ -1025,15 +941,15 @@ public class NotebookServer extends WebSocketServlet } } - private void emptyTrash( - NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, Message fromMessage) - throws SchedulerException, IOException { + private void emptyTrash(NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, + Message fromMessage) throws SchedulerException, IOException { fromMessage.data = new HashMap<>(); fromMessage.put("id", Folder.TRASH_FOLDER_ID); removeFolder(conn, userAndRoles, notebook, fromMessage); } - private void updateParagraph(NotebookSocket conn, Message fromMessage) throws IOException { + private void updateParagraph(NotebookSocket conn, + Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); String noteId = connectionManager.getAssociatedNoteId(conn); if (noteId == null) { @@ -1044,33 +960,25 @@ public class NotebookServer extends WebSocketServlet Map<String, Object> params = (Map<String, Object>) fromMessage.get("params"); Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); - getNotebookService() - .updateParagraph( - noteId, - paragraphId, - title, - text, - params, - config, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Paragraph>(conn) { - @Override - public void onSuccess(Paragraph p, ServiceContext context) throws IOException { - super.onSuccess(p, context); - if (p.getNote().isPersonalizedMode()) { - Map<String, Paragraph> userParagraphMap = - p.getNote().getParagraph(paragraphId).getUserParagraphMap(); - broadcastParagraphs(userParagraphMap, p); - } else { - broadcastParagraph(p.getNote(), p); - } - } - }); + getNotebookService().updateParagraph(noteId, paragraphId, title, text, params, config, + getServiceContext(fromMessage), + new WebSocketServiceCallback<Paragraph>(conn) { + @Override + public void onSuccess(Paragraph p, ServiceContext context) throws IOException { + super.onSuccess(p, context); + if (p.getNote().isPersonalizedMode()) { + Map<String, Paragraph> userParagraphMap = + p.getNote().getParagraph(paragraphId).getUserParagraphMap(); + broadcastParagraphs(userParagraphMap, p); + } else { + broadcastParagraph(p.getNote(), p); + } + } + }); } - private void patchParagraph( - NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, Message fromMessage) - throws IOException { + private void patchParagraph(NotebookSocket conn, HashSet<String> userAndRoles, + Notebook notebook, Message fromMessage) throws IOException { if (!collaborativeModeEnable) { return; } @@ -1087,8 +995,8 @@ public class NotebookServer extends WebSocketServlet } } - if (!hasParagraphWriterPermission( - conn, notebook, noteId, userAndRoles, fromMessage.principal, "write")) { + if (!hasParagraphWriterPermission(conn, notebook, noteId, + userAndRoles, fromMessage.principal, "write")) { return; } @@ -1120,153 +1028,131 @@ public class NotebookServer extends WebSocketServlet String paragraphText = p.getText() == null ? "" : p.getText(); paragraphText = (String) dmp.patchApply(patches, paragraphText)[0]; p.setText(paragraphText); - Message message = - new Message(OP.PATCH_PARAGRAPH).put("patch", patchText).put("paragraphId", p.getId()); + Message message = new Message(OP.PATCH_PARAGRAPH).put("patch", patchText) + .put("paragraphId", p.getId()); connectionManager.broadcastExcept(note.getId(), message, conn); } - private void cloneNote(NotebookSocket conn, Message fromMessage) throws IOException { + private void cloneNote(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = connectionManager.getAssociatedNoteId(conn); String name = (String) fromMessage.get("name"); - getNotebookService() - .cloneNote( - noteId, - name, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Note>(conn) { - @Override - public void onSuccess(Note newNote, ServiceContext context) throws IOException { - super.onSuccess(newNote, context); - connectionManager.addNoteConnection(newNote.getId(), conn); - conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote))); - broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); - } - }); + getNotebookService().cloneNote(noteId, name, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note newNote, ServiceContext context) throws IOException { + super.onSuccess(newNote, context); + connectionManager.addNoteConnection(newNote.getId(), conn); + conn.send(serializeMessage(new Message(OP.NEW_NOTE).put("note", newNote))); + broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); + } + }); } - private void clearAllParagraphOutput(NotebookSocket conn, Message fromMessage) - throws IOException { + private void clearAllParagraphOutput(NotebookSocket conn, + Message fromMessage) throws IOException { final String noteId = (String) fromMessage.get("id"); - getNotebookService() - .clearAllParagraphOutput( - noteId, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Note>(conn) { - @Override - public void onSuccess(Note note, ServiceContext context) throws IOException { - super.onSuccess(note, context); - broadcastNote(note); - } - }); + getNotebookService().clearAllParagraphOutput(noteId, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + super.onSuccess(note, context); + broadcastNote(note); + } + }); } protected Note importNote(NotebookSocket conn, Message fromMessage) throws IOException { String noteName = (String) ((Map) fromMessage.get("note")).get("name"); String noteJson = gson.toJson(fromMessage.get("note")); - Note note = - getNotebookService() - .importNote( - noteName, - noteJson, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Note>(conn) { - @Override - public void onSuccess(Note note, ServiceContext context) throws IOException { - super.onSuccess(note, context); - try { - broadcastNote(note); - broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); - } catch (NullPointerException e) { - // TODO(zjffdu) remove this try catch. This is only for test of - // NotebookServerTest#testImportNotebook - } - } - }); + Note note = getNotebookService().importNote(noteName, noteJson, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + super.onSuccess(note, context); + try { + broadcastNote(note); + broadcastNoteList(context.getAutheInfo(), context.getUserAndRoles()); + } catch (NullPointerException e) { + // TODO(zjffdu) remove this try catch. This is only for test of + // NotebookServerTest#testImportNotebook + } + } + }); return note; } - private void removeParagraph(NotebookSocket conn, Message fromMessage) throws IOException { + private void removeParagraph(NotebookSocket conn, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); String noteId = connectionManager.getAssociatedNoteId(conn); - getNotebookService() - .removeParagraph( - noteId, - paragraphId, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Paragraph>(conn) { - @Override - public void onSuccess(Paragraph p, ServiceContext context) throws IOException { - super.onSuccess(p, context); - connectionManager.broadcast( - p.getNote().getId(), new Message(OP.PARAGRAPH_REMOVED).put("id", p.getId())); - } - }); + getNotebookService().removeParagraph(noteId, paragraphId, + getServiceContext(fromMessage), new WebSocketServiceCallback<Paragraph>(conn){ + @Override + public void onSuccess(Paragraph p, ServiceContext context) throws IOException { + super.onSuccess(p, context); + connectionManager.broadcast(p.getNote().getId(), new Message(OP.PARAGRAPH_REMOVED). + put("id", p.getId())); + } + }); } - private void clearParagraphOutput(NotebookSocket conn, Message fromMessage) throws IOException { + private void clearParagraphOutput(NotebookSocket conn, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); String noteId = connectionManager.getAssociatedNoteId(conn); - getNotebookService() - .clearParagraphOutput( - noteId, - paragraphId, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Paragraph>(conn) { - @Override - public void onSuccess(Paragraph p, ServiceContext context) throws IOException { - super.onSuccess(p, context); - if (p.getNote().isPersonalizedMode()) { - connectionManager.unicastParagraph( - p.getNote(), p, context.getAutheInfo().getUser()); - } else { - broadcastParagraph(p.getNote(), p); - } - } - }); + getNotebookService().clearParagraphOutput(noteId, paragraphId, getServiceContext(fromMessage), + new WebSocketServiceCallback<Paragraph>(conn) { + @Override + public void onSuccess(Paragraph p, ServiceContext context) throws IOException { + super.onSuccess(p, context); + if (p.getNote().isPersonalizedMode()) { + connectionManager.unicastParagraph(p.getNote(), p, context.getAutheInfo().getUser()); + } else { + broadcastParagraph(p.getNote(), p); + } + } + }); } - private void completion(NotebookSocket conn, Message fromMessage) throws IOException { + private void completion(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = connectionManager.getAssociatedNoteId(conn); String paragraphId = (String) fromMessage.get("id"); String buffer = (String) fromMessage.get("buf"); int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString()); - getNotebookService() - .completion( - noteId, - paragraphId, - buffer, - cursor, - getServiceContext(fromMessage), - new WebSocketServiceCallback<List<InterpreterCompletion>>(conn) { - @Override - public void onSuccess(List<InterpreterCompletion> completions, ServiceContext context) - throws IOException { - super.onSuccess(completions, context); - Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId); - resp.put("completions", completions); - conn.send(serializeMessage(resp)); - } + getNotebookService().completion(noteId, paragraphId, buffer, cursor, + getServiceContext(fromMessage), + new WebSocketServiceCallback<List<InterpreterCompletion>>(conn) { + @Override + public void onSuccess(List<InterpreterCompletion> completions, ServiceContext context) + throws IOException { + super.onSuccess(completions, context); + Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId); + resp.put("completions", completions); + conn.send(serializeMessage(resp)); + } - @Override - public void onFailure(Exception ex, ServiceContext context) throws IOException { - super.onFailure(ex, context); - Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId); - resp.put("completions", new ArrayList<>()); - conn.send(serializeMessage(resp)); - } - }); + @Override + public void onFailure(Exception ex, ServiceContext context) throws IOException { + super.onFailure(ex, context); + Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId); + resp.put("completions", new ArrayList<>()); + conn.send(serializeMessage(resp)); + } + }); } /** * When angular object updated from client. * - * @param conn the web socket. - * @param notebook the notebook. + * @param conn the web socket. + * @param notebook the notebook. * @param fromMessage the message. */ - private void angularObjectUpdated( - NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, Message fromMessage) { + private void angularObjectUpdated(NotebookSocket conn, HashSet<String> userAndRoles, + Notebook notebook, Message fromMessage) { String noteId = (String) fromMessage.get("noteId"); String paragraphId = (String) fromMessage.get("paragraphId"); String interpreterGroupId = (String) fromMessage.get("interpreterGroupId"); @@ -1284,7 +1170,8 @@ public class NotebookServer extends WebSocketServlet if (setting.getInterpreterGroup(user, note.getId()) == null) { continue; } - if (interpreterGroupId.equals(setting.getInterpreterGroup(user, note.getId()).getId())) { + if (interpreterGroupId.equals(setting.getInterpreterGroup(user, note.getId()) + .getId())) { AngularObjectRegistry angularObjectRegistry = setting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry(); @@ -1326,39 +1213,31 @@ public class NotebookServer extends WebSocketServlet if (setting.getInterpreterGroup(user, n.getId()) == null) { continue; } - if (interpreterGroupId.equals(setting.getInterpreterGroup(user, n.getId()).getId())) { + if (interpreterGroupId.equals(setting.getInterpreterGroup(user, n.getId()) + .getId())) { AngularObjectRegistry angularObjectRegistry = setting.getInterpreterGroup(user, n.getId()).getAngularObjectRegistry(); - connectionManager.broadcastExcept( - n.getId(), - new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", ao) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", n.getId()) - .put("paragraphId", ao.getParagraphId()), - conn); + connectionManager.broadcastExcept(n.getId(), + new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId).put("noteId", n.getId()) + .put("paragraphId", ao.getParagraphId()), conn); } } } } else { // broadcast to all web session for the note - connectionManager.broadcastExcept( - note.getId(), - new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", ao) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", note.getId()) - .put("paragraphId", ao.getParagraphId()), - conn); + connectionManager.broadcastExcept(note.getId(), + new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId).put("noteId", note.getId()) + .put("paragraphId", ao.getParagraphId()), conn); } } /** - * Push the given Angular variable to the target interpreter angular registry given a noteId and a - * paragraph id. + * Push the given Angular variable to the target interpreter angular registry given a noteId + * and a paragraph id. */ - protected void angularObjectClientBind( - NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, Message fromMessage) - throws Exception { + protected void angularObjectClientBind(NotebookSocket conn, HashSet<String> userAndRoles, + Notebook notebook, Message fromMessage) throws Exception { String noteId = fromMessage.getType("noteId"); String varName = fromMessage.getType("name"); Object varValue = fromMessage.get("value"); @@ -1377,12 +1256,12 @@ public class NotebookServer extends WebSocketServlet if (registry instanceof RemoteAngularObjectRegistry) { RemoteAngularObjectRegistry remoteRegistry = (RemoteAngularObjectRegistry) registry; - pushAngularObjectToRemoteRegistry( - noteId, paragraphId, varName, varValue, remoteRegistry, interpreterGroup.getId(), conn); + pushAngularObjectToRemoteRegistry(noteId, paragraphId, varName, varValue, remoteRegistry, + interpreterGroup.getId(), conn); } else { - pushAngularObjectToLocalRepo( - noteId, paragraphId, varName, varValue, registry, interpreterGroup.getId(), conn); + pushAngularObjectToLocalRepo(noteId, paragraphId, varName, varValue, registry, + interpreterGroup.getId(), conn); } } } @@ -1391,8 +1270,8 @@ public class NotebookServer extends WebSocketServlet * Remove the given Angular variable to the target interpreter(s) angular registry given a noteId * and an optional list of paragraph id(s). */ - protected void angularObjectClientUnbind( - NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, Message fromMessage) + protected void angularObjectClientUnbind(NotebookSocket conn, HashSet<String> userAndRoles, + Notebook notebook, Message fromMessage) throws Exception { String noteId = fromMessage.getType("noteId"); String varName = fromMessage.getType("name"); @@ -1411,11 +1290,11 @@ public class NotebookServer extends WebSocketServlet if (registry instanceof RemoteAngularObjectRegistry) { RemoteAngularObjectRegistry remoteRegistry = (RemoteAngularObjectRegistry) registry; - removeAngularFromRemoteRegistry( - noteId, paragraphId, varName, remoteRegistry, interpreterGroup.getId(), conn); + removeAngularFromRemoteRegistry(noteId, paragraphId, varName, remoteRegistry, + interpreterGroup.getId(), conn); } else { - removeAngularObjectFromLocalRepo( - noteId, paragraphId, varName, registry, interpreterGroup.getId(), conn); + removeAngularObjectFromLocalRepo(noteId, paragraphId, varName, registry, + interpreterGroup.getId(), conn); } } } @@ -1429,54 +1308,36 @@ public class NotebookServer extends WebSocketServlet return paragraph.getBindedInterpreter().getInterpreterGroup(); } - private void pushAngularObjectToRemoteRegistry( - String noteId, - String paragraphId, - String varName, - Object varValue, - RemoteAngularObjectRegistry remoteRegistry, - String interpreterGroupId, - NotebookSocket conn) { + private void pushAngularObjectToRemoteRegistry(String noteId, String paragraphId, String varName, + Object varValue, + RemoteAngularObjectRegistry remoteRegistry, + String interpreterGroupId, + NotebookSocket conn) { final AngularObject ao = remoteRegistry.addAndNotifyRemoteProcess(varName, varValue, noteId, paragraphId); - connectionManager.broadcastExcept( - noteId, - new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", ao) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", noteId) - .put("paragraphId", paragraphId), - conn); - } - - private void removeAngularFromRemoteRegistry( - String noteId, - String paragraphId, - String varName, - RemoteAngularObjectRegistry remoteRegistry, - String interpreterGroupId, - NotebookSocket conn) { + connectionManager.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId) + .put("paragraphId", paragraphId), conn); + } + + private void removeAngularFromRemoteRegistry(String noteId, String paragraphId, String varName, + RemoteAngularObjectRegistry remoteRegistry, + String interpreterGroupId, + NotebookSocket conn) { final AngularObject ao = remoteRegistry.removeAndNotifyRemoteProcess(varName, noteId, paragraphId); - connectionManager.broadcastExcept( - noteId, - new Message(OP.ANGULAR_OBJECT_REMOVE) - .put("angularObject", ao) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", noteId) - .put("paragraphId", paragraphId), - conn); - } - - private void pushAngularObjectToLocalRepo( - String noteId, - String paragraphId, - String varName, - Object varValue, - AngularObjectRegistry registry, - String interpreterGroupId, - NotebookSocket conn) { + connectionManager.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE) + .put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId) + .put("paragraphId", paragraphId), conn); + } + + private void pushAngularObjectToLocalRepo(String noteId, String paragraphId, String varName, + Object varValue, AngularObjectRegistry registry, + String interpreterGroupId, + NotebookSocket conn) { AngularObject angularObject = registry.get(varName, noteId, paragraphId); if (angularObject == null) { angularObject = registry.add(varName, varValue, noteId, paragraphId); @@ -1484,58 +1345,43 @@ public class NotebookServer extends WebSocketServlet angularObject.set(varValue, true); } - connectionManager.broadcastExcept( - noteId, - new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", angularObject) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", noteId) - .put("paragraphId", paragraphId), - conn); + connectionManager.broadcastExcept(noteId, + new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", angularObject) + .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId) + .put("paragraphId", paragraphId), conn); } - private void removeAngularObjectFromLocalRepo( - String noteId, - String paragraphId, - String varName, - AngularObjectRegistry registry, - String interpreterGroupId, - NotebookSocket conn) { + private void removeAngularObjectFromLocalRepo(String noteId, String paragraphId, String varName, + AngularObjectRegistry registry, + String interpreterGroupId, NotebookSocket conn) { final AngularObject removed = registry.remove(varName, noteId, paragraphId); if (removed != null) { - connectionManager.broadcastExcept( - noteId, - new Message(OP.ANGULAR_OBJECT_REMOVE) - .put("angularObject", removed) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", noteId) - .put("paragraphId", paragraphId), - conn); + connectionManager.broadcastExcept(noteId, + new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", removed) + .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId) + .put("paragraphId", paragraphId), conn); } } - private void moveParagraph(NotebookSocket conn, Message fromMessage) throws IOException { + private void moveParagraph(NotebookSocket conn, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString()); String noteId = connectionManager.getAssociatedNoteId(conn); - getNotebookService() - .moveParagraph( - noteId, - paragraphId, - newIndex, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Paragraph>(conn) { - @Override - public void onSuccess(Paragraph result, ServiceContext context) throws IOException { - super.onSuccess(result, context); - connectionManager.broadcast( - result.getNote().getId(), - new Message(OP.PARAGRAPH_MOVED).put("id", paragraphId).put("index", newIndex)); - } - }); + getNotebookService().moveParagraph(noteId, paragraphId, newIndex, + getServiceContext(fromMessage), + new WebSocketServiceCallback<Paragraph>(conn) { + @Override + public void onSuccess(Paragraph result, ServiceContext context) throws IOException { + super.onSuccess(result, context); + connectionManager.broadcast(result.getNote().getId(), + new Message(OP.PARAGRAPH_MOVED).put("id", paragraphId).put("index", newIndex)); + } + }); } - private String insertParagraph(NotebookSocket conn, Message fromMessage) throws IOException { + private String insertParagraph(NotebookSocket conn, + Message fromMessage) throws IOException { final int index = (int) Double.parseDouble(fromMessage.get("index").toString()); String noteId = connectionManager.getAssociatedNoteId(conn); Map<String, Object> config; @@ -1545,25 +1391,21 @@ public class NotebookServer extends WebSocketServlet config = new HashMap<>(); } - Paragraph newPara = - getNotebookService() - .insertParagraph( - noteId, - index, - config, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Paragraph>(conn) { - @Override - public void onSuccess(Paragraph p, ServiceContext context) throws IOException { - super.onSuccess(p, context); - broadcastNewParagraph(p.getNote(), p); - } - }); + Paragraph newPara = getNotebookService().insertParagraph(noteId, index, config, + getServiceContext(fromMessage), + new WebSocketServiceCallback<Paragraph>(conn) { + @Override + public void onSuccess(Paragraph p, ServiceContext context) throws IOException { + super.onSuccess(p, context); + broadcastNewParagraph(p.getNote(), p); + } + }); return newPara.getId(); } - private void copyParagraph(NotebookSocket conn, Message fromMessage) throws IOException { + private void copyParagraph(NotebookSocket conn, + Message fromMessage) throws IOException { String newParaId = insertParagraph(conn, fromMessage); if (newParaId == null) { @@ -1577,32 +1419,24 @@ public class NotebookServer extends WebSocketServlet private void cancelParagraph(NotebookSocket conn, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); String noteId = connectionManager.getAssociatedNoteId(conn); - getNotebookService() - .cancelParagraph( - noteId, - paragraphId, - getServiceContext(fromMessage), - new WebSocketServiceCallback<>(conn)); + getNotebookService().cancelParagraph(noteId, paragraphId, getServiceContext(fromMessage), + new WebSocketServiceCallback<>(conn)); } - private void runAllParagraphs(NotebookSocket conn, Message fromMessage) throws IOException { + private void runAllParagraphs(NotebookSocket conn, + Message fromMessage) throws IOException { final String noteId = (String) fromMessage.get("noteId"); List<Map<String, Object>> paragraphs = - gson.fromJson( - String.valueOf(fromMessage.data.get("paragraphs")), - new TypeToken<List<Map<String, Object>>>() {}.getType()); + gson.fromJson(String.valueOf(fromMessage.data.get("paragraphs")), + new TypeToken<List<Map<String, Object>>>() { + }.getType()); - getNotebookService() - .runAllParagraphs( - noteId, - paragraphs, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Paragraph>(conn)); + getNotebookService().runAllParagraphs(noteId, paragraphs, getServiceContext(fromMessage), + new WebSocketServiceCallback<Paragraph>(conn)); } - private void broadcastSpellExecution( - NotebookSocket conn, HashSet<String> userAndRoles, Notebook notebook, Message fromMessage) - throws IOException { + private void broadcastSpellExecution(NotebookSocket conn, HashSet<String> userAndRoles, + Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -1610,8 +1444,8 @@ public class NotebookServer extends WebSocketServlet String noteId = connectionManager.getAssociatedNoteId(conn); - if (!hasParagraphWriterPermission( - conn, notebook, noteId, userAndRoles, fromMessage.principal, "write")) { + if (!hasParagraphWriterPermission(conn, notebook, noteId, + userAndRoles, fromMessage.principal, "write")) { return; } @@ -1622,8 +1456,8 @@ public class NotebookServer extends WebSocketServlet Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); final Note note = notebook.getNote(noteId); - Paragraph p = - setParagraphUsingMessage(note, fromMessage, paragraphId, text, title, params, config); + Paragraph p = setParagraphUsingMessage(note, fromMessage, paragraphId, + text, title, params, config); p.setResult((InterpreterResult) fromMessage.get("results")); p.setErrorMessage((String) fromMessage.get("errorMessage")); p.setStatusWithoutNotification(status); @@ -1651,64 +1485,56 @@ public class NotebookServer extends WebSocketServlet } // broadcast to other clients only - connectionManager.broadcastExcept( - note.getId(), new Message(OP.RUN_PARAGRAPH_USING_SPELL).put("paragraph", p), conn); + connectionManager.broadcastExcept(note.getId(), + new Message(OP.RUN_PARAGRAPH_USING_SPELL).put("paragraph", p), conn); } - private void runParagraph(NotebookSocket conn, Message fromMessage) throws IOException { + private void runParagraph(NotebookSocket conn, + Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); String noteId = connectionManager.getAssociatedNoteId(conn); String text = (String) fromMessage.get("paragraph"); String title = (String) fromMessage.get("title"); Map<String, Object> params = (Map<String, Object>) fromMessage.get("params"); Map<String, Object> config = (Map<String, Object>) fromMessage.get("config"); - getNotebookService() - .runParagraph( - noteId, - paragraphId, - title, - text, - params, - config, - false, - false, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Paragraph>(conn) { - @Override - public void onSuccess(Paragraph p, ServiceContext context) throws IOException { - super.onSuccess(p, context); - if (p.getNote().isPersonalizedMode()) { - Paragraph p2 = - p.getNote() - .clearPersonalizedParagraphOutput( - paragraphId, context.getAutheInfo().getUser()); - connectionManager.unicastParagraph( - p.getNote(), p2, context.getAutheInfo().getUser()); - } - - // if it's the last paragraph and not empty, let's add a new one - boolean isTheLastParagraph = p.getNote().isLastParagraph(paragraphId); - if (!(Strings.isNullOrEmpty(p.getText()) - || Strings.isNullOrEmpty(p.getScriptText())) - && isTheLastParagraph) { - Paragraph newPara = p.getNote().addNewParagraph(p.getAuthenticationInfo()); - broadcastNewParagraph(p.getNote(), newPara); - } - } - }); + getNotebookService().runParagraph(noteId, paragraphId, title, text, params, config, + false, false, getServiceContext(fromMessage), + new WebSocketServiceCallback<Paragraph>(conn) { + @Override + public void onSuccess(Paragraph p, ServiceContext context) throws IOException { + super.onSuccess(p, context); + if (p.getNote().isPersonalizedMode()) { + Paragraph p2 = p.getNote().clearPersonalizedParagraphOutput(paragraphId, + context.getAutheInfo().getUser()); + connectionManager.unicastParagraph(p.getNote(), p2, context.getAutheInfo().getUser()); + } + + // if it's the last paragraph and not empty, let's add a new one + boolean isTheLastParagraph = p.getNote().isLastParagraph(paragraphId); + if (!(Strings.isNullOrEmpty(p.getText()) || + Strings.isNullOrEmpty(p.getScriptText())) && + isTheLastParagraph) { + Paragraph newPara = p.getNote().addNewParagraph(p.getAuthenticationInfo()); + broadcastNewParagraph(p.getNote(), newPara); + } + } + }); } private void addNewParagraphIfLastParagraphIsExecuted(Note note, Paragraph p) { // if it's the last paragraph and not empty, let's add a new one boolean isTheLastParagraph = note.isLastParagraph(p.getId()); - if (!(Strings.isNullOrEmpty(p.getText()) || Strings.isNullOrEmpty(p.getScriptText())) - && isTheLastParagraph) { + if (!(Strings.isNullOrEmpty(p.getText()) || + Strings.isNullOrEmpty(p.getScriptText())) && + isTheLastParagraph) { Paragraph newPara = note.addNewParagraph(p.getAuthenticationInfo()); broadcastNewParagraph(note, newPara); } } - /** @return false if failed to save a note */ + /** + * @return false if failed to save a note + */ private boolean persistNoteWithAuthInfo(NotebookSocket conn, Note note, Paragraph p) throws IOException { try { @@ -1716,26 +1542,17 @@ public class NotebookServer extends WebSocketServlet return true; } catch (IOException ex) { LOG.error("Exception from run", ex); - conn.send( - serializeMessage( - new Message(OP.ERROR_INFO) - .put( - "info", - "Oops! There is something wrong with the notebook file system. " - + "Please check the logs for more details."))); + conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", + "Oops! There is something wrong with the notebook file system. " + + "Please check the logs for more details."))); // don't run the paragraph when there is error on persisting the note information return false; } } - private Paragraph setParagraphUsingMessage( - Note note, - Message fromMessage, - String paragraphId, - String text, - String title, - Map<String, Object> params, - Map<String, Object> config) { + private Paragraph setParagraphUsingMessage(Note note, Message fromMessage, String paragraphId, + String text, String title, Map<String, Object> params, + Map<String, Object> config) { Paragraph p = note.getParagraph(paragraphId); p.setText(text); p.setTitle(title); @@ -1757,137 +1574,110 @@ public class NotebookServer extends WebSocketServlet return p; } - private void sendAllConfigurations(NotebookSocket conn, Message message) throws IOException { + private void sendAllConfigurations(NotebookSocket conn, + Message message ) throws IOException { - configurationService.getAllProperties( - getServiceContext(message), + configurationService.getAllProperties(getServiceContext(message), new WebSocketServiceCallback<Map<String, String>>(conn) { @Override - public void onSuccess(Map<String, String> properties, ServiceContext context) - throws IOException { + public void onSuccess(Map<String, String> properties, + ServiceContext context) throws IOException { super.onSuccess(properties, context); properties.put("isRevisionSupported", String.valueOf(notebook().isRevisionSupported())); - conn.send( - serializeMessage( - new Message(OP.CONFIGURATIONS_INFO).put("configurations", properties))); + conn.send(serializeMessage( + new Message(OP.CONFIGURATIONS_INFO).put("configurations", properties))); } }); } - private void checkpointNote(NotebookSocket conn, Message fromMessage) throws IOException { + private void checkpointNote(NotebookSocket conn, Message fromMessage) + throws IOException { String noteId = (String) fromMessage.get("noteId"); String commitMessage = (String) fromMessage.get("commitMessage"); - getNotebookService() - .checkpointNote( - noteId, - commitMessage, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Revision>(conn) { - @Override - public void onSuccess(Revision revision, ServiceContext context) throws IOException { - super.onSuccess(revision, context); - if (!Revision.isEmpty(revision)) { - List<Revision> revisions = - notebook().listRevisionHistory(noteId, context.getAutheInfo()); - conn.send( - serializeMessage( - new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions))); - } else { - conn.send( - serializeMessage( - new Message(OP.ERROR_INFO) - .put( - "info", - "Couldn't checkpoint note revision: possibly storage doesn't support versioning. " - + "Please check the logs for more details."))); - } - } - }); + getNotebookService().checkpointNote(noteId, commitMessage, getServiceContext(fromMessage), + new WebSocketServiceCallback<Revision>(conn) { + @Override + public void onSuccess(Revision revision, ServiceContext context) throws IOException { + super.onSuccess(revision, context); + if (!Revision.isEmpty(revision)) { + List<Revision> revisions = + notebook().listRevisionHistory(noteId, context.getAutheInfo()); + conn.send( + serializeMessage(new Message(OP.LIST_REVISION_HISTORY).put("revisionList", + revisions))); + } else { + conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", + "Couldn't checkpoint note revision: possibly storage doesn't support versioning. " + + "Please check the logs for more details."))); + } + } + }); } - private void listRevisionHistory(NotebookSocket conn, Message fromMessage) throws IOException { + private void listRevisionHistory(NotebookSocket conn, Message fromMessage) + throws IOException { String noteId = (String) fromMessage.get("noteId"); - getNotebookService() - .listRevisionHistory( - noteId, - getServiceContext(fromMessage), - new WebSocketServiceCallback<List<Revision>>(conn) { - @Override - public void onSuccess(List<Revision> revisions, ServiceContext context) - throws IOException { - super.onSuccess(revisions, context); - conn.send( - serializeMessage( - new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions))); - } - }); + getNotebookService().listRevisionHistory(noteId, getServiceContext(fromMessage), + new WebSocketServiceCallback<List<Revision>>(conn) { + @Override + public void onSuccess(List<Revision> revisions, ServiceContext context) + throws IOException { + super.onSuccess(revisions, context); + conn.send(serializeMessage( + new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions))); + } + }); } - private void setNoteRevision(NotebookSocket conn, Message fromMessage) throws IOException { + private void setNoteRevision(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("noteId"); String revisionId = (String) fromMessage.get("revisionId"); - getNotebookService() - .setNoteRevision( - noteId, - revisionId, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Note>(conn) { - @Override - public void onSuccess(Note note, ServiceContext context) throws IOException { - super.onSuccess(note, context); - Note reloadedNote = notebook().loadNoteFromRepo(noteId, context.getAutheInfo()); - conn.send(serializeMessage(new Message(OP.SET_NOTE_REVISION).put("status", true))); - broadcastNote(reloadedNote); - } - }); + getNotebookService().setNoteRevision(noteId, revisionId, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + super.onSuccess(note, context); + Note reloadedNote = notebook().loadNoteFromRepo(noteId, context.getAutheInfo()); + conn.send(serializeMessage(new Message(OP.SET_NOTE_REVISION).put("status", true))); + broadcastNote(reloadedNote); + } + }); } - private void getNoteByRevision(NotebookSocket conn, Message fromMessage) throws IOException { + private void getNoteByRevision(NotebookSocket conn, Message fromMessage) + throws IOException { String noteId = (String) fromMessage.get("noteId"); String revisionId = (String) fromMessage.get("revisionId"); - getNotebookService() - .getNotebyRevision( - noteId, - revisionId, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Note>(conn) { - @Override - public void onSuccess(Note note, ServiceContext context) throws IOException { - super.onSuccess(note, context); - conn.send( - serializeMessage( - new Message(OP.NOTE_REVISION) - .put("noteId", noteId) - .put("revisionId", revisionId) - .put("note", note))); - } - }); + getNotebookService().getNotebyRevision(noteId, revisionId, getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + super.onSuccess(note, context); + conn.send(serializeMessage( + new Message(OP.NOTE_REVISION).put("noteId", noteId).put("revisionId", revisionId) + .put("note", note))); + } + }); } - private void getNoteByRevisionForCompare(NotebookSocket conn, Message fromMessage) - throws IOException { + private void getNoteByRevisionForCompare(NotebookSocket conn, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("noteId"); String revisionId = (String) fromMessage.get("revisionId"); String position = (String) fromMessage.get("position"); - getNotebookService() - .getNoteByRevisionForCompare( - noteId, - revisionId, - getServiceContext(fromMessage), - new WebSocketServiceCallback<Note>(conn) { - @Override - public void onSuccess(Note note, ServiceContext context) throws IOException { - super.onSuccess(note, context); - conn.send( - serializeMessage( - new Message(OP.NOTE_REVISION_FOR_COMPARE) - .put("noteId", noteId) - .put("revisionId", revisionId) - .put("position", position) - .put("note", note))); - } - }); + getNotebookService().getNoteByRevisionForCompare(noteId, revisionId, + getServiceContext(fromMessage), + new WebSocketServiceCallback<Note>(conn) { + @Override + public void onSuccess(Note note, ServiceContext context) throws IOException { + super.onSuccess(note, context); + conn.send(serializeMessage( + new Message(OP.NOTE_REVISION_FOR_COMPARE).put("noteId", noteId) + .put("revisionId", revisionId).put("position", position).put("note", note))); + } + }); } /** @@ -1897,12 +1687,8 @@ public class NotebookServer extends WebSocketServlet */ @Override public void onOutputAppend(String noteId, String paragraphId, int index, String output) { - Message msg = - new Message(OP.PARAGRAPH_APPEND_OUTPUT) - .put("noteId", noteId) - .put("paragraphId", paragraphId) - .put("index", index) - .put("data", output); + Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT).put("noteId", noteId) + .put("paragraphId", paragraphId).put("index", index).put("data", output); connectionManager.broadcast(noteId, msg); } @@ -1912,15 +1698,10 @@ public class NotebookServer extends WebSocketServlet * @param output output to update (replace) */ @Override - public void onOutputUpdated( - String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) { - Message msg = - new Message(OP.PARAGRAPH_UPDATE_OUTPUT) - .put("noteId", noteId) - .put("paragraphId", paragraphId) - .put("index", index) - .put("type", type) - .put("data", output); + public void onOutputUpdated(String noteId, String paragraphId, int index, + InterpreterResult.Type type, String output) { + Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT).put("noteId", noteId) + .put("paragraphId", paragraphId).put("index", index).put("type", type).put("data", output); Note note = notebook().getNote(noteId); if (note.isPersonalizedMode()) { String user = note.getParagraph(paragraphId).getUser(); @@ -1932,7 +1713,9 @@ public class NotebookServer extends WebSocketServlet } } - /** This callback is for the paragraph that runs on ZeppelinServer. */ + /** + * This callback is for the paragraph that runs on ZeppelinServer. + */ @Override public void onOutputClear(String noteId, String paragraphId) { Notebook notebook = notebook(); @@ -1942,69 +1725,51 @@ public class NotebookServer extends WebSocketServlet broadcastParagraph(note, paragraph); } - /** When application append output. */ + /** + * When application append output. + */ @Override - public void onOutputAppend( - String noteId, String paragraphId, int index, String appId, String output) { + public void
<TRUNCATED>
